In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Setup System G") \
.getOrCreate()
In [2]:
import numpy as np
import string
from pyspark.sql.functions import concat_ws
def deg2rad(deg):
return deg/360*(2*np.pi)
def append_m_prefix(row):
prefix = 'm'
new_row={}
new_row['mid'] = prefix+str(row['m_id'])
new_row['m_id'] = row['m_id']
return new_row
def append_z_prefix(row):
prefix = 'z'
new_row={}
new_row['zid'] = prefix+str(row['z_id'])
new_row['z_id'] = row['z_id']
return new_row
PATH_SYSTEMG = '../systemg/'
PATH_RAWDATA = '../rawdata/'
PATH_PROCESSEDDATA = '../processeddata/'
METRO_FN = 'Gaz_ua_national.txt'
ZCTA_FN = 'Gaz_zcta_national.txt'
METRO_CSV_FN = 'metroareas.csv'
ZCARBON_FN = 'zcarbon.csv'
NODES_FN = 'nodes.csv'
DRIV_DIST_FN='driv_dist.parquet'
DRIV_DIST_CSV_FN='driv_dist.csv'
In [3]:
dfMetroAreasRaw = spark.read.load(PATH_RAWDATA+METRO_FN, format="csv", delimiter="\t", header=True, inferSchema=True)
dfMetroAreasRaw = dfMetroAreasRaw.withColumnRenamed(dfMetroAreasRaw.columns[-1],dfMetroAreasRaw.columns[-1].strip(string.whitespace))
dfMetroAreas = dfMetroAreasRaw.select('GEOID','NAME','UATYPE','POP10','HU10','ALAND_SQMI',\
'AWATER_SQMI','INTPTLAT','INTPTLONG')\
.withColumnRenamed('GEOID','m_id')\
.withColumnRenamed('NAME', 'name')\
.withColumnRenamed('POP10','m_pop')\
.withColumnRenamed('HU10','m_house_unit')\
.withColumnRenamed('ALAND_SQMI','m_land')\
.withColumnRenamed('AWATER_SQMI','m_water')\
.withColumnRenamed('INTPTLAT', 'm_lat_d')\
.withColumnRenamed('INTPTLONG', 'm_long_d')
dfMetroAreas = dfMetroAreas.withColumn('m_lat_r', deg2rad(dfMetroAreas.m_lat_d)).withColumn('m_long_r', deg2rad(dfMetroAreas.m_long_d))
temp = dfMetroAreas.rdd.map(append_m_prefix)
dfR=spark.createDataFrame(temp)
dfMetroAreas = dfR.join(dfMetroAreas,dfMetroAreas.m_id==dfR.m_id).drop(dfR.m_id)
dfMetroAreas.coalesce(1).write.csv(PATH_SYSTEMG+METRO_CSV_FN,header=True,mode='overwrite')
In [4]:
dfZData = spark.read.csv(PATH_PROCESSEDDATA + ZCARBON_FN,header=True)
temp = dfZData.rdd.map(append_z_prefix)
dfR=spark.createDataFrame(temp)
dfZData = dfR.join(dfZData,dfZData.z_id==dfR.z_id).drop(dfR.z_id)
dfZData.count()
dfZData.printSchema()
dfZData.coalesce(1).write.csv(PATH_SYSTEMG + NODES_FN,header=True,mode='overwrite')
In [5]:
dfDDist = spark.read.parquet(PATH_PROCESSEDDATA+DRIV_DIST_FN)
temp = dfDDist.select('z_id').distinct().rdd.map(append_z_prefix)
dfZ=spark.createDataFrame(temp)
temp = dfDDist.select('m_id').distinct().rdd.map(append_m_prefix)
dfM=spark.createDataFrame(temp)
dfMD = dfM.join(dfDDist,dfDDist.m_id==dfM.m_id)
dfDDist = dfZ.join(dfMD,dfMD.z_id==dfZ.z_id)
dfDDist = dfDDist.select('zid','mid','ddist')
dfDDist.coalesce(1).write.csv(PATH_SYSTEMG+DRIV_DIST_CSV_FN,header=True,mode='overwrite')