In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Setup System G") \
    .getOrCreate()

In [2]:
import numpy as np
import string
from pyspark.sql.functions import concat_ws

def deg2rad(deg):
    return deg/360*(2*np.pi)

def append_m_prefix(row):
    prefix = 'm'
    new_row={}
    new_row['mid'] = prefix+str(row['m_id'])
    new_row['m_id'] = row['m_id']
    return new_row

def append_z_prefix(row):
    prefix = 'z'
    new_row={}
    new_row['zid'] = prefix+str(row['z_id'])
    new_row['z_id'] = row['z_id']
    return new_row

PATH_SYSTEMG = '../systemg/'
PATH_RAWDATA = '../rawdata/'
PATH_PROCESSEDDATA = '../processeddata/'
METRO_FN = 'Gaz_ua_national.txt'
ZCTA_FN = 'Gaz_zcta_national.txt'
METRO_CSV_FN = 'metroareas.csv'
ZCARBON_FN = 'zcarbon.csv'
NODES_FN = 'nodes.csv'
DRIV_DIST_FN='driv_dist.parquet'
DRIV_DIST_CSV_FN='driv_dist.csv'

In [3]:
dfMetroAreasRaw = spark.read.load(PATH_RAWDATA+METRO_FN, format="csv", delimiter="\t", header=True, inferSchema=True)
dfMetroAreasRaw = dfMetroAreasRaw.withColumnRenamed(dfMetroAreasRaw.columns[-1],dfMetroAreasRaw.columns[-1].strip(string.whitespace))
dfMetroAreas = dfMetroAreasRaw.select('GEOID','NAME','UATYPE','POP10','HU10','ALAND_SQMI',\
                                      'AWATER_SQMI','INTPTLAT','INTPTLONG')\
                            .withColumnRenamed('GEOID','m_id')\
                            .withColumnRenamed('NAME', 'name')\
                            .withColumnRenamed('POP10','m_pop')\
                            .withColumnRenamed('HU10','m_house_unit')\
                            .withColumnRenamed('ALAND_SQMI','m_land')\
                            .withColumnRenamed('AWATER_SQMI','m_water')\
                            .withColumnRenamed('INTPTLAT', 'm_lat_d')\
                            .withColumnRenamed('INTPTLONG', 'm_long_d')


dfMetroAreas = dfMetroAreas.withColumn('m_lat_r', deg2rad(dfMetroAreas.m_lat_d)).withColumn('m_long_r', deg2rad(dfMetroAreas.m_long_d))

temp = dfMetroAreas.rdd.map(append_m_prefix)
dfR=spark.createDataFrame(temp)
    
dfMetroAreas = dfR.join(dfMetroAreas,dfMetroAreas.m_id==dfR.m_id).drop(dfR.m_id)

dfMetroAreas.coalesce(1).write.csv(PATH_SYSTEMG+METRO_CSV_FN,header=True,mode='overwrite')


/opt/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "

In [4]:
dfZData = spark.read.csv(PATH_PROCESSEDDATA + ZCARBON_FN,header=True)
temp = dfZData.rdd.map(append_z_prefix)
dfR=spark.createDataFrame(temp)
    
dfZData = dfR.join(dfZData,dfZData.z_id==dfR.z_id).drop(dfR.z_id)
dfZData.count()
dfZData.printSchema()
dfZData.coalesce(1).write.csv(PATH_SYSTEMG + NODES_FN,header=True,mode='overwrite')


/opt/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "
root
 |-- zid: string (nullable = true)
 |-- z_id: string (nullable = true)
 |-- z_lat_d: string (nullable = true)
 |-- z_long_d: string (nullable = true)
 |-- z_land: string (nullable = true)
 |-- z_water: string (nullable = true)
 |-- z_pop: string (nullable = true)
 |-- z_households: string (nullable = true)
 |-- z_comm: string (nullable = true)
 |-- z_med_inc: string (nullable = true)
 |-- z_house_unit: string (nullable = true)
 |-- z_comm_miles: string (nullable = true)
 |-- z_comm_miles_ph: string (nullable = true)
 |-- z_carb_ton_ph: string (nullable = true)
 |-- z_pov: string (nullable = true)
 |-- z_per_comm: string (nullable = true)


In [5]:
dfDDist = spark.read.parquet(PATH_PROCESSEDDATA+DRIV_DIST_FN)

temp = dfDDist.select('z_id').distinct().rdd.map(append_z_prefix)
dfZ=spark.createDataFrame(temp)

temp = dfDDist.select('m_id').distinct().rdd.map(append_m_prefix)
dfM=spark.createDataFrame(temp)

dfMD = dfM.join(dfDDist,dfDDist.m_id==dfM.m_id)
dfDDist = dfZ.join(dfMD,dfMD.z_id==dfZ.z_id)


dfDDist = dfDDist.select('zid','mid','ddist')
dfDDist.coalesce(1).write.csv(PATH_SYSTEMG+DRIV_DIST_CSV_FN,header=True,mode='overwrite')


/opt/spark-2.0.2-bin-hadoop2.7/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "