In [1]:
#https://www.nodalpoint.com/spark-data-frames-from-csv-files-handling-headers-column-types/
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

In [2]:
schema = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", FloatType(), True),
    StructField("pickup_longitude", FloatType(), True),
    StructField("pickup_latitude", FloatType(), True),
    StructField("rate_code_id", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", FloatType(), True),
    StructField("dropoff_latitude", FloatType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", FloatType(), True),
    StructField("extra", FloatType(), True),
    StructField("mta_tax", FloatType(), True),
    StructField("tip_amount", FloatType(), True),
    StructField("tolls_amount", FloatType(), True),
    StructField("improvement_surcharge", FloatType(), True),
    StructField("total_amount", FloatType(), True)])

In [3]:
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)

In [4]:
taxiFile = sc.textFile("./yellow_tripdata_2016-06.csv")

In [5]:
print(taxiFile)


./yellow_tripdata_2016-06.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [6]:
taxiHeader = taxiFile.filter(lambda l: "total_amount" in l)

In [7]:
taxiHeader.collect()


Out[7]:
['VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount']

In [8]:
taxiNoHeader = taxiFile.subtract(taxiHeader)

In [9]:
taxiFile.count()


Out[9]:
11135471

In [10]:
print(taxiNoHeader)


PythonRDD[11] at RDD at PythonRDD.scala:53

In [11]:
taxiNoHeader.count()


Out[11]:
11135470

In [12]:
from datetime import *
from dateutil.parser import parse
parse("2013-02-09 18:16:10")


Out[12]:
datetime.datetime(2013, 2, 9, 18, 16, 10)

In [13]:
schemacopy = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", FloatType(), True),
    StructField("pickup_longitude", FloatType(), True),
    StructField("pickup_latitude", FloatType(), True),
    StructField("rate_code_id", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", FloatType(), True),
    StructField("dropoff_latitude", FloatType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", FloatType(), True),
    StructField("extra", FloatType(), True),
    StructField("mta_tax", FloatType(), True),
    StructField("tip_amount", FloatType(), True),
    StructField("tolls_amount", FloatType(), True),
    StructField("improvement_surcharge", FloatType(), True),
    StructField("total_amount", FloatType(), True)])
taxisplit = taxiNoHeader.map(lambda k: k.split(",")).map(lambda p: (p[0].strip('"'), 
                                                                    parse(p[1].strip('"')), 
                                                                    parse(p[2].strip('"')), 
                                                                    int(p[3]), 
                                                                    float(p[4]), 
                                                                    float(p[5]), 
                                                                    float(p[6]), 
                                                                    int(p[7]), 
                                                                    p[8].strip('"'),
                                                                    float(p[9]), 
                                                                    float(p[10]), 
                                                                    int(p[11]), 
                                                                    float(p[12]), 
                                                                    float(p[13]), 
                                                                    float(p[14]), 
                                                                    float(p[15]), 
                                                                    float(p[16]), 
                                                                    float(p[17]), 
                                                                    float(p[18]) )).toDF(schema)

In [14]:
taxisplit.head(2)
print(taxisplit)


DataFrame[vendor_id: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, passenger_count: int, trip_distance: float, pickup_longitude: float, pickup_latitude: float, rate_code_id: int, store_and_fwd_flag: string, dropoff_longitude: float, dropoff_latitude: float, payment_type: int, fare_amount: float, extra: float, mta_tax: float, tip_amount: float, tolls_amount: float, improvement_surcharge: float, total_amount: float]

In [15]:
#smarter load of the csv file with a header 
df = sqlContext.read.load("./yellow_tripdata_2016-06.csv", format='com.databricks.spark.csv', header='true', inferSchema='true')
print(df)


DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: int, trip_distance: double, pickup_longitude: double, pickup_latitude: double, RatecodeID: int, store_and_fwd_flag: string, dropoff_longitude: double, dropoff_latitude: double, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double]

In [16]:
df = (df.withColumn('dropoff_datetime', df.tpep_dropoff_datetime.cast('timestamp'))
       .withColumn('pickup_datetime', df.tpep_pickup_datetime.cast('timestamp'))
       .withColumnRenamed('VendorID', 'vendor_id')
       .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')
       .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
       .withColumnRenamed('RatecodeID', 'rate_code_id'))
print(df)


DataFrame[vendor_id: int, pickup_datetime: timestamp, dropoff_datetime: timestamp, passenger_count: int, trip_distance: double, pickup_longitude: double, pickup_latitude: double, rate_code_id: int, store_and_fwd_flag: string, dropoff_longitude: double, dropoff_latitude: double, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, dropoff_datetime: timestamp, pickup_datetime: timestamp]

In [20]:
taxisplit.write.parquet('./work/yellow_tripdata_2016-06-parquet')