In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Calculate Distances") \
.getOrCreate()
In [2]:
import string
PATH_RAWDATA = '../rawdata/'
PATH_PROCESSEDDATA = '../processeddata/'
In [3]:
import numpy as np
METRO_FN = 'Gaz_ua_national.txt'
def deg2rad(deg):
return deg/360*(2*np.pi)
MIN_POP = 50000
dfMetroAreasRaw = spark.read.load(PATH_RAWDATA+METRO_FN, format="csv", delimiter="\t", header=True, inferSchema=True)
dfMetroAreasRaw = dfMetroAreasRaw.withColumnRenamed(dfMetroAreasRaw.columns[-1],dfMetroAreasRaw.columns[-1].strip(string.whitespace))
dfMetroAreasRaw.printSchema()
dfMetroAreas = dfMetroAreasRaw.select('GEOID','NAME','UATYPE','POP10','HU10','ALAND_SQMI',\
'AWATER_SQMI','INTPTLAT','INTPTLONG')\
.filter(dfMetroAreasRaw.POP10>MIN_POP)\
.withColumnRenamed('GEOID','m_id')\
.withColumnRenamed('NAME', 'name')\
.withColumnRenamed('POP10','m_pop')\
.withColumnRenamed('HU10','m_house_unit')\
.withColumnRenamed('ALAND_SQMI','m_land')\
.withColumnRenamed('AWATER_SQMI','m_water')\
.withColumnRenamed('INTPTLAT', 'm_lat_d')\
.withColumnRenamed('INTPTLONG', 'm_long_d')
dfMetroAreas = dfMetroAreas.withColumn('m_lat_r', deg2rad(dfMetroAreas.m_lat_d)).withColumn('m_long_r', deg2rad(dfMetroAreas.m_long_d))
print('Number of Metro Areas (before filtering):', dfMetroAreasRaw.count())
print('Number of Metro Areas (after filtering):', dfMetroAreas.count())
dfMetroAreas.show(5)
In [4]:
ZCTA_FN = 'Gaz_zcta_national.txt'
dfZCTAsRaw = spark.read.load(PATH_RAWDATA+ZCTA_FN, format="csv", delimiter="\t", header=True, inferSchema=True)
dfZCTAsRaw = dfZCTAsRaw.withColumnRenamed(dfZCTAsRaw.columns[-1],dfZCTAsRaw.columns[-1].strip(string.whitespace))
dfZCTAsRaw.printSchema()
dfZCTAs = dfZCTAsRaw.select('GEOID','POP10','HU10','ALAND_SQMI','AWATER_SQMI','INTPTLAT','INTPTLONG')\
.withColumnRenamed('GEOID','z_id')\
.withColumnRenamed('POP10','z_pop')\
.withColumnRenamed('HU10','z_house_unit')\
.withColumnRenamed('ALAND_SQMI','z_land')\
.withColumnRenamed('AWATER_SQMI','z_water')\
.withColumnRenamed('INTPTLAT', 'z_lat_d')\
.withColumnRenamed('INTPTLONG', 'z_long_d')
dfZCTAs = dfZCTAs.withColumn('z_lat_r', deg2rad(dfZCTAs.z_lat_d)).withColumn('z_long_r', deg2rad(dfZCTAs.z_long_d))
print(dfZCTAs.count())
dfZCTAs.printSchema()
dfZCTAs.show(5)
In [5]:
# Wikipedia, "Great-circle distance," https://en.wikipedia.org/wiki/Great-circle_distance retrieved 12/9/2016
from pyspark.sql.functions import acos, cos, sin, abs
RMETERS = 6371000 #meters
RMILES = RMETERS*0.000621371
DIST_FN='distances.parquet'
dfDist = dfZCTAs.join(dfMetroAreas)
print(dfZCTAs.count(),dfMetroAreas.count(),dfZCTAs.count()*dfMetroAreas.count(),dfDist.count())
dfDist.printSchema()
dfDist.show(5)
dfDist = dfDist.withColumn('dist',acos(
sin(dfDist.z_lat_r)*sin(dfDist.m_lat_r)+
cos(dfDist.z_lat_r)*cos(dfDist.m_lat_r)*cos(abs(dfDist.z_long_r-dfDist.m_long_r))
)*RMILES)
dfDist.write.save(PATH_PROCESSEDDATA+DIST_FN,mode='overwrite')