In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Calculate Distances") \
    .getOrCreate()

In [2]:
import string
PATH_RAWDATA = '../rawdata/'
PATH_PROCESSEDDATA = '../processeddata/'

In [3]:
import numpy as np
METRO_FN = 'Gaz_ua_national.txt'
def deg2rad(deg):
    return deg/360*(2*np.pi)
MIN_POP = 50000

dfMetroAreasRaw = spark.read.load(PATH_RAWDATA+METRO_FN, format="csv", delimiter="\t", header=True, inferSchema=True)
dfMetroAreasRaw = dfMetroAreasRaw.withColumnRenamed(dfMetroAreasRaw.columns[-1],dfMetroAreasRaw.columns[-1].strip(string.whitespace))
dfMetroAreasRaw.printSchema()
dfMetroAreas = dfMetroAreasRaw.select('GEOID','NAME','UATYPE','POP10','HU10','ALAND_SQMI',\
                                      'AWATER_SQMI','INTPTLAT','INTPTLONG')\
                            .filter(dfMetroAreasRaw.POP10>MIN_POP)\
                            .withColumnRenamed('GEOID','m_id')\
                            .withColumnRenamed('NAME', 'name')\
                            .withColumnRenamed('POP10','m_pop')\
                            .withColumnRenamed('HU10','m_house_unit')\
                            .withColumnRenamed('ALAND_SQMI','m_land')\
                            .withColumnRenamed('AWATER_SQMI','m_water')\
                            .withColumnRenamed('INTPTLAT', 'm_lat_d')\
                            .withColumnRenamed('INTPTLONG', 'm_long_d')

dfMetroAreas = dfMetroAreas.withColumn('m_lat_r', deg2rad(dfMetroAreas.m_lat_d)).withColumn('m_long_r', deg2rad(dfMetroAreas.m_long_d))
print('Number of Metro Areas (before filtering):', dfMetroAreasRaw.count())
print('Number of Metro Areas (after filtering):', dfMetroAreas.count())
dfMetroAreas.show(5)


root
 |-- GEOID: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- UATYPE: string (nullable = true)
 |-- POP10: integer (nullable = true)
 |-- HU10: integer (nullable = true)
 |-- ALAND: long (nullable = true)
 |-- AWATER: integer (nullable = true)
 |-- ALAND_SQMI: double (nullable = true)
 |-- AWATER_SQMI: double (nullable = true)
 |-- INTPTLAT: double (nullable = true)
 |-- INTPTLONG: double (nullable = true)

Number of Metro Areas (before filtering): 3592
Number of Metro Areas (after filtering): 497
+----+--------------------+------+------+------------+-------+-------+---------+----------+-------------------+-------------------+
|m_id|                name|UATYPE| m_pop|m_house_unit| m_land|m_water|  m_lat_d|  m_long_d|            m_lat_r|           m_long_r|
+----+--------------------+------+------+------------+-------+-------+---------+----------+-------------------+-------------------+
| 199|Aberdeen--Bel Air...|     U|213751|       83721|131.131|  3.794|39.508977| -76.30343| 0.6895617327447118|-1.3317460840650168|
| 280|Abilene, TX Urban...|     U|110421|       46732| 54.732|  0.382|32.428466|-99.747188| 0.5659835030710355|-1.7409168502057775|
| 631|Aguadilla--Isabel...|     U|306196|      138431|239.274|  0.348|18.369286|-67.040259|0.32060452194049904|-1.1700732509397613|
| 766|Akron, OH Urbaniz...|     U|569499|      257659|325.377|  6.731|41.066225|-81.491897| 0.7167408376148084|-1.4223019163460895|
| 901|Albany, GA Urbani...|     U| 95779|       40430| 70.845|  1.077|31.591097|-84.168827| 0.5513686569669031|-1.4690231586915012|
+----+--------------------+------+------+------------+-------+-------+---------+----------+-------------------+-------------------+
only showing top 5 rows


In [4]:
ZCTA_FN = 'Gaz_zcta_national.txt'
dfZCTAsRaw = spark.read.load(PATH_RAWDATA+ZCTA_FN, format="csv", delimiter="\t", header=True, inferSchema=True)
dfZCTAsRaw = dfZCTAsRaw.withColumnRenamed(dfZCTAsRaw.columns[-1],dfZCTAsRaw.columns[-1].strip(string.whitespace))
dfZCTAsRaw.printSchema()

dfZCTAs = dfZCTAsRaw.select('GEOID','POP10','HU10','ALAND_SQMI','AWATER_SQMI','INTPTLAT','INTPTLONG')\
                .withColumnRenamed('GEOID','z_id')\
                .withColumnRenamed('POP10','z_pop')\
                .withColumnRenamed('HU10','z_house_unit')\
                .withColumnRenamed('ALAND_SQMI','z_land')\
                .withColumnRenamed('AWATER_SQMI','z_water')\
                .withColumnRenamed('INTPTLAT', 'z_lat_d')\
                .withColumnRenamed('INTPTLONG', 'z_long_d')

dfZCTAs = dfZCTAs.withColumn('z_lat_r', deg2rad(dfZCTAs.z_lat_d)).withColumn('z_long_r', deg2rad(dfZCTAs.z_long_d))
print(dfZCTAs.count())
dfZCTAs.printSchema()
dfZCTAs.show(5)


root
 |-- GEOID: integer (nullable = true)
 |-- POP10: integer (nullable = true)
 |-- HU10: integer (nullable = true)
 |-- ALAND: long (nullable = true)
 |-- AWATER: long (nullable = true)
 |-- ALAND_SQMI: double (nullable = true)
 |-- AWATER_SQMI: double (nullable = true)
 |-- INTPTLAT: double (nullable = true)
 |-- INTPTLONG: double (nullable = true)

33120
root
 |-- z_id: integer (nullable = true)
 |-- z_pop: integer (nullable = true)
 |-- z_house_unit: integer (nullable = true)
 |-- z_land: double (nullable = true)
 |-- z_water: double (nullable = true)
 |-- z_lat_d: double (nullable = true)
 |-- z_long_d: double (nullable = true)
 |-- z_lat_r: double (nullable = true)
 |-- z_long_r: double (nullable = true)

+----+-----+------------+------+-------+---------+----------+-------------------+-------------------+
|z_id|z_pop|z_house_unit|z_land|z_water|  z_lat_d|  z_long_d|            z_lat_r|           z_long_r|
+----+-----+------------+------+-------+---------+----------+-------------------+-------------------+
| 601|18570|        7744|64.348|  0.309|18.180555|-66.749961|0.31731054458991764|-1.1650065950278068|
| 602|41520|       18073|30.613|  1.717|18.362268| -67.17613| 0.3204820347335941|-1.1724446472477383|
| 603|54689|       25653|31.614|  0.071|18.455183|-67.119887| 0.3221037074080847|-1.1714630217165392|
| 606| 6615|        2877|42.309|  0.005|18.158345|-66.932911|0.31692290696304976|-1.1681996748943304|
| 610|29016|       12618|35.916|  1.611|18.290955|-67.125868| 0.3192373880841194| -1.171567409859101|
+----+-----+------------+------+-------+---------+----------+-------------------+-------------------+
only showing top 5 rows


In [5]:
# Wikipedia, "Great-circle distance," https://en.wikipedia.org/wiki/Great-circle_distance retrieved 12/9/2016
from pyspark.sql.functions import acos, cos, sin, abs

RMETERS = 6371000 #meters
RMILES = RMETERS*0.000621371
DIST_FN='distances.parquet'

dfDist = dfZCTAs.join(dfMetroAreas)
print(dfZCTAs.count(),dfMetroAreas.count(),dfZCTAs.count()*dfMetroAreas.count(),dfDist.count())
dfDist.printSchema()
dfDist.show(5)

dfDist = dfDist.withColumn('dist',acos(
                            sin(dfDist.z_lat_r)*sin(dfDist.m_lat_r)+
                            cos(dfDist.z_lat_r)*cos(dfDist.m_lat_r)*cos(abs(dfDist.z_long_r-dfDist.m_long_r))
                            )*RMILES)


dfDist.write.save(PATH_PROCESSEDDATA+DIST_FN,mode='overwrite')


33120 497 16460640 16460640
root
 |-- z_id: integer (nullable = true)
 |-- z_pop: integer (nullable = true)
 |-- z_house_unit: integer (nullable = true)
 |-- z_land: double (nullable = true)
 |-- z_water: double (nullable = true)
 |-- z_lat_d: double (nullable = true)
 |-- z_long_d: double (nullable = true)
 |-- z_lat_r: double (nullable = true)
 |-- z_long_r: double (nullable = true)
 |-- m_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- UATYPE: string (nullable = true)
 |-- m_pop: integer (nullable = true)
 |-- m_house_unit: integer (nullable = true)
 |-- m_land: double (nullable = true)
 |-- m_water: double (nullable = true)
 |-- m_lat_d: double (nullable = true)
 |-- m_long_d: double (nullable = true)
 |-- m_lat_r: double (nullable = true)
 |-- m_long_r: double (nullable = true)

+----+-----+------------+------+-------+---------+----------+-------------------+-------------------+----+--------------------+------+------+------------+-------+-------+---------+----------+-------------------+-------------------+
|z_id|z_pop|z_house_unit|z_land|z_water|  z_lat_d|  z_long_d|            z_lat_r|           z_long_r|m_id|                name|UATYPE| m_pop|m_house_unit| m_land|m_water|  m_lat_d|  m_long_d|            m_lat_r|           m_long_r|
+----+-----+------------+------+-------+---------+----------+-------------------+-------------------+----+--------------------+------+------+------------+-------+-------+---------+----------+-------------------+-------------------+
| 601|18570|        7744|64.348|  0.309|18.180555|-66.749961|0.31731054458991764|-1.1650065950278068| 199|Aberdeen--Bel Air...|     U|213751|       83721|131.131|  3.794|39.508977| -76.30343| 0.6895617327447118|-1.3317460840650168|
| 601|18570|        7744|64.348|  0.309|18.180555|-66.749961|0.31731054458991764|-1.1650065950278068| 280|Abilene, TX Urban...|     U|110421|       46732| 54.732|  0.382|32.428466|-99.747188| 0.5659835030710355|-1.7409168502057775|
| 601|18570|        7744|64.348|  0.309|18.180555|-66.749961|0.31731054458991764|-1.1650065950278068| 631|Aguadilla--Isabel...|     U|306196|      138431|239.274|  0.348|18.369286|-67.040259|0.32060452194049904|-1.1700732509397613|
| 601|18570|        7744|64.348|  0.309|18.180555|-66.749961|0.31731054458991764|-1.1650065950278068| 766|Akron, OH Urbaniz...|     U|569499|      257659|325.377|  6.731|41.066225|-81.491897| 0.7167408376148084|-1.4223019163460895|
| 601|18570|        7744|64.348|  0.309|18.180555|-66.749961|0.31731054458991764|-1.1650065950278068| 901|Albany, GA Urbani...|     U| 95779|       40430| 70.845|  1.077|31.591097|-84.168827| 0.5513686569669031|-1.4690231586915012|
+----+-----+------------+------+-------+---------+----------+-------------------+-------------------+----+--------------------+------+------+------------+-------+-------+---------+----------+-------------------+-------------------+
only showing top 5 rows