In [ ]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql import Row
from pyspark.sql.types import *
import pandas as pd
import StringIO
import matplotlib.pyplot as plt
hc = sc._jsc.hadoopConfiguration()
hc.set("hive.execution.engine", "mr")
In [ ]:
import csv
def parseCsv(csvStr):
f = StringIO.StringIO(csvStr)
reader = csv.reader(f, delimiter=',')
row = reader.next()
return row
scsv = '"02Q","Titan Airways"'
row = parseCsv(scsv)
print row[0]
print row[1]
working_storage = 'WORKING_STORAGE'
output_directory = 'jupyter/py2'
protocol_name = 'PROTOCOL_NAME://'
In [ ]:
carriersHeader = 'Code,Description'
carriersText = sc.textFile(protocol_name + working_storage + "/jupyter_dataset/carriers.csv").filter(lambda x: x != carriersHeader)
carriers = carriersText.map(lambda s: parseCsv(s)) \
.map(lambda s: Row(code=s[0], description=s[1])).cache().toDF()
carriers.write.mode("overwrite").parquet(protocol_name + working_storage + "/" + output_directory + "/carriers")
sqlContext.registerDataFrameAsTable(carriers, "carriers")
carriers.limit(20).toPandas()
In [ ]:
airportsHeader= '"iata","airport","city","state","country","lat","long"'
airports = sc.textFile(protocol_name + working_storage + "/jupyter_dataset/airports.csv") \
.filter(lambda x: x != airportsHeader) \
.map(lambda s: parseCsv(s)) \
.map(lambda p: Row(iata=p[0], \
airport=p[1], \
city=p[2], \
state=p[3], \
country=p[4], \
lat=float(p[5]), \
longt=float(p[6])) \
).cache().toDF()
airports.write.mode("overwrite").parquet(protocol_name + working_storage + "/" + output_directory + "/airports")
sqlContext.registerDataFrameAsTable(airports, "airports")
airports.limit(20).toPandas()
In [ ]:
flightsHeader = 'Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay'
flights = sc.textFile(protocol_name + working_storage + "/jupyter_dataset/2008.csv.bz2") \
.filter(lambda x: x!= flightsHeader) \
.map(lambda s: parseCsv(s)) \
.map(lambda p: Row(Year=int(p[0]), \
Month=int(p[1]), \
DayofMonth=int(p[2]), \
DayOfWeek=int(p[3]), \
DepTime=p[4], \
CRSDepTime=p[5], \
ArrTime=p[6], \
CRSArrTime=p[7], \
UniqueCarrier=p[8], \
FlightNum=p[9], \
TailNum=p[10], \
ActualElapsedTime=p[11], \
CRSElapsedTime=p[12], \
AirTime=p[13], \
ArrDelay=int(p[14].replace("NA", "0")), \
DepDelay=int(p[15].replace("NA", "0")), \
Origin=p[16], \
Dest=p[17], \
Distance=long(p[18]), \
TaxiIn=p[19], \
TaxiOut=p[20], \
Cancelled=p[21], \
CancellationCode=p[22], \
Diverted=p[23], \
CarrierDelay=int(p[24].replace("NA", "0")), \
CarrierDelayStr=p[24], \
WeatherDelay=int(p[25].replace("NA", "0")), \
WeatherDelayStr=p[25], \
NASDelay=int(p[26].replace("NA", "0")), \
SecurityDelay=int(p[27].replace("NA", "0")), \
LateAircraftDelay=int(p[28].replace("NA", "0")))) \
.toDF()
flights.write.mode("ignore").parquet(protocol_name + working_storage + "/" + output_directory + "/flights")
sqlContext.registerDataFrameAsTable(flights, "flights")
flights.limit(10).toPandas()[["ArrDelay","CarrierDelay","CarrierDelayStr","WeatherDelay","WeatherDelayStr","Distance"]]
In [ ]: