In [1]:
#https://www.nodalpoint.com/spark-data-frames-from-csv-files-handling-headers-column-types/
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
In [2]:
schema = StructType([
StructField("vendor_id", StringType(), True),
StructField("pickup_datetime", TimestampType(), True),
StructField("dropoff_datetime", TimestampType(), True),
StructField("passenger_count", IntegerType(), True),
StructField("trip_distance", FloatType(), True),
StructField("pickup_longitude", FloatType(), True),
StructField("pickup_latitude", FloatType(), True),
StructField("rate_code_id", IntegerType(), True),
StructField("store_and_fwd_flag", StringType(), True),
StructField("dropoff_longitude", FloatType(), True),
StructField("dropoff_latitude", FloatType(), True),
StructField("payment_type", IntegerType(), True),
StructField("fare_amount", FloatType(), True),
StructField("extra", FloatType(), True),
StructField("mta_tax", FloatType(), True),
StructField("tip_amount", FloatType(), True),
StructField("tolls_amount", FloatType(), True),
StructField("improvement_surcharge", FloatType(), True),
StructField("total_amount", FloatType(), True)])
In [3]:
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)
In [4]:
taxiFile = sc.textFile("./yellow_tripdata_2016-06.csv")
In [5]:
print(taxiFile)
In [6]:
taxiHeader = taxiFile.filter(lambda l: "total_amount" in l)
In [7]:
taxiHeader.collect()
Out[7]:
In [8]:
taxiNoHeader = taxiFile.subtract(taxiHeader)
In [9]:
taxiFile.count()
Out[9]:
In [10]:
print(taxiNoHeader)
In [11]:
taxiNoHeader.count()
Out[11]:
In [12]:
from datetime import *
from dateutil.parser import parse
parse("2013-02-09 18:16:10")
Out[12]:
In [13]:
schemacopy = StructType([
StructField("vendor_id", StringType(), True),
StructField("pickup_datetime", TimestampType(), True),
StructField("dropoff_datetime", TimestampType(), True),
StructField("passenger_count", IntegerType(), True),
StructField("trip_distance", FloatType(), True),
StructField("pickup_longitude", FloatType(), True),
StructField("pickup_latitude", FloatType(), True),
StructField("rate_code_id", IntegerType(), True),
StructField("store_and_fwd_flag", StringType(), True),
StructField("dropoff_longitude", FloatType(), True),
StructField("dropoff_latitude", FloatType(), True),
StructField("payment_type", IntegerType(), True),
StructField("fare_amount", FloatType(), True),
StructField("extra", FloatType(), True),
StructField("mta_tax", FloatType(), True),
StructField("tip_amount", FloatType(), True),
StructField("tolls_amount", FloatType(), True),
StructField("improvement_surcharge", FloatType(), True),
StructField("total_amount", FloatType(), True)])
taxisplit = taxiNoHeader.map(lambda k: k.split(",")).map(lambda p: (p[0].strip('"'),
parse(p[1].strip('"')),
parse(p[2].strip('"')),
int(p[3]),
float(p[4]),
float(p[5]),
float(p[6]),
int(p[7]),
p[8].strip('"'),
float(p[9]),
float(p[10]),
int(p[11]),
float(p[12]),
float(p[13]),
float(p[14]),
float(p[15]),
float(p[16]),
float(p[17]),
float(p[18]) )).toDF(schema)
In [14]:
taxisplit.head(2)
print(taxisplit)
In [15]:
#smarter load of the csv file with a header
df = sqlContext.read.load("./yellow_tripdata_2016-06.csv", format='com.databricks.spark.csv', header='true', inferSchema='true')
print(df)
In [16]:
df = (df.withColumn('dropoff_datetime', df.tpep_dropoff_datetime.cast('timestamp'))
.withColumn('pickup_datetime', df.tpep_pickup_datetime.cast('timestamp'))
.withColumnRenamed('VendorID', 'vendor_id')
.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')
.withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
.withColumnRenamed('RatecodeID', 'rate_code_id'))
print(df)
In [20]:
taxisplit.write.parquet('./work/yellow_tripdata_2016-06-parquet')