Flights data preparation


In [ ]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql import Row
from pyspark.sql.types import *
import pandas as pd
import StringIO
import matplotlib.pyplot as plt
hc = sc._jsc.hadoopConfiguration()
hc.set("hive.execution.engine", "mr")

Function to parse CSV


In [ ]:
import csv

def parseCsv(csvStr):
    f = StringIO.StringIO(csvStr)
    reader = csv.reader(f, delimiter=',')
    row = reader.next()
    return row

scsv = '"02Q","Titan Airways"'
row = parseCsv(scsv)
print row[0]
print row[1]

working_storage = 'WORKING_STORAGE'
output_directory = 'jupyter/py2'
protocol_name = 'PROTOCOL_NAME://'

Parse and convert Carrier data to parquet


In [ ]:
carriersHeader = 'Code,Description'
carriersText = sc.textFile(protocol_name + working_storage + "/jupyter_dataset/carriers.csv").filter(lambda x: x != carriersHeader)
carriers = carriersText.map(lambda s: parseCsv(s)) \
    .map(lambda s: Row(code=s[0], description=s[1])).cache().toDF()
carriers.write.mode("overwrite").parquet(protocol_name + working_storage + "/" + output_directory + "/carriers")    
sqlContext.registerDataFrameAsTable(carriers, "carriers")
carriers.limit(20).toPandas()

Parse and convert to parquet Airport data


In [ ]:
airportsHeader= '"iata","airport","city","state","country","lat","long"'
airports = sc.textFile(protocol_name + working_storage + "/jupyter_dataset/airports.csv") \
    .filter(lambda x: x != airportsHeader) \
    .map(lambda s: parseCsv(s)) \
    .map(lambda p: Row(iata=p[0], \
                       airport=p[1], \
                       city=p[2], \
                       state=p[3], \
                       country=p[4], \
                       lat=float(p[5]), \
                       longt=float(p[6])) \
        ).cache().toDF()
airports.write.mode("overwrite").parquet(protocol_name + working_storage + "/" + output_directory + "/airports")    
sqlContext.registerDataFrameAsTable(airports, "airports")
airports.limit(20).toPandas()

Parse and convert Flights data to parquet


In [ ]:
flightsHeader = 'Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay'
flights = sc.textFile(protocol_name + working_storage + "/jupyter_dataset/2008.csv.bz2") \
    .filter(lambda x: x!= flightsHeader) \
    .map(lambda s: parseCsv(s)) \
    .map(lambda p: Row(Year=int(p[0]), \
                       Month=int(p[1]), \
                       DayofMonth=int(p[2]), \
                       DayOfWeek=int(p[3]), \
                       DepTime=p[4], \
                       CRSDepTime=p[5], \
                       ArrTime=p[6], \
                       CRSArrTime=p[7], \
                       UniqueCarrier=p[8], \
                       FlightNum=p[9], \
                       TailNum=p[10], \
                       ActualElapsedTime=p[11], \
                       CRSElapsedTime=p[12], \
                       AirTime=p[13], \
                       ArrDelay=int(p[14].replace("NA", "0")), \
                       DepDelay=int(p[15].replace("NA", "0")), \
                       Origin=p[16], \
                       Dest=p[17], \
                       Distance=long(p[18]), \
                       TaxiIn=p[19], \
                       TaxiOut=p[20], \
                       Cancelled=p[21], \
                       CancellationCode=p[22], \
                       Diverted=p[23], \
                       CarrierDelay=int(p[24].replace("NA", "0")), \
                                              CarrierDelayStr=p[24], \
                       WeatherDelay=int(p[25].replace("NA", "0")), \
                                              WeatherDelayStr=p[25], \
                       NASDelay=int(p[26].replace("NA", "0")), \
                       SecurityDelay=int(p[27].replace("NA", "0")), \
                       LateAircraftDelay=int(p[28].replace("NA", "0")))) \
         .toDF()

flights.write.mode("ignore").parquet(protocol_name + working_storage + "/" + output_directory + "/flights")
sqlContext.registerDataFrameAsTable(flights, "flights")
flights.limit(10).toPandas()[["ArrDelay","CarrierDelay","CarrierDelayStr","WeatherDelay","WeatherDelayStr","Distance"]]

In [ ]: