This notebook must be run under PySpark (2.0.2 +)

I had better luck when I restarted the notebook kernel in between different parquet groups.

Convert and repartition Citibike DataFrame using Spark


In [1]:
# standard imports 
funcs = pyspark.sql.functions
types = pyspark.sql.types

In [2]:
sqlContext.sql("set spark.sql.shuffle.partitions=32")
bike = spark.read.parquet('/data/citibike.parquet')
bike.registerTempTable('bike')

In [3]:
spark.sql('select * from bike limit 5').toPandas()


Out[3]:
trip_duration start_time stop_time start_station_id start_station_name start_station_latitude start_station_longitude end_station_id end_station_name end_station_latitude end_station_longitude bike_id user_type birth_year gender start_taxizone_id end_taxizone_id
0 801 1481563027000000 1481563829000000 3002 "South End Ave & Liberty St" 40.711512 -74.015756 346 "Bank St & Hudson St" 40.736529 -74.006180 25788 "Subscriber" 1976.0 1 13.0 158.0
1 767 1481563031000000 1481563799000000 3224 "W 13 St & Hudson St" 40.739974 -74.005139 236 "St Marks Pl & 2 Ave" 40.728419 -73.987140 25773 "Subscriber" 1973.0 1 249.0 79.0
2 883 1481563034000000 1481563918000000 3263 "Cooper Square & E 7 St" 40.729236 -73.990868 127 "Barrow St & Hudson St" 40.731724 -74.006744 20572 "Subscriber" 1963.0 1 79.0 158.0
3 427 1481563036000000 1481563464000000 484 "W 44 St & 5 Ave" 40.755003 -73.980144 492 "W 33 St & 7 Ave" 40.750200 -73.990931 25620 "Subscriber" 1985.0 1 161.0 186.0
4 478 1481563039000000 1481563518000000 519 "Pershing Square North" 40.751873 -73.977706 526 "E 33 St & 5 Ave" 40.747659 -73.984907 25019 "Subscriber" 1974.0 2 170.0 164.0

In [4]:
bike = (bike
    .withColumn('start_time', 
                    funcs.from_unixtime((bike.start_time/1000000).cast(types.IntegerType()))
                    .cast(types.TimestampType()))
    .withColumn('stop_time', 
                    funcs.from_unixtime((bike.stop_time/1000000).cast(types.IntegerType()))
                    .cast(types.TimestampType()))        
#     .withColumn('start_time', 
#                 (funcs.substring(bike.start_time, 1, 20)).cast(types.TimestampType()))
#     .withColumn('stop_time', bike.stop_time.cast(types.TimestampType())) \
    .withColumn('start_taxizone_id', bike.start_taxizone_id.cast(types.FloatType())) 
    .withColumn('end_taxizone_id', bike.end_taxizone_id.cast(types.FloatType())) 
        )
bike.registerTempTable('bike2')

In [5]:
spark.sql('select * from bike2 limit 5').toPandas()


Out[5]:
trip_duration start_time stop_time start_station_id start_station_name start_station_latitude start_station_longitude end_station_id end_station_name end_station_latitude end_station_longitude bike_id user_type birth_year gender start_taxizone_id end_taxizone_id
0 801 2016-12-12 12:17:07 2016-12-12 12:30:29 3002 "South End Ave & Liberty St" 40.711512 -74.015756 346 "Bank St & Hudson St" 40.736529 -74.006180 25788 "Subscriber" 1976.0 1 13.0 158.0
1 767 2016-12-12 12:17:11 2016-12-12 12:29:59 3224 "W 13 St & Hudson St" 40.739974 -74.005139 236 "St Marks Pl & 2 Ave" 40.728419 -73.987140 25773 "Subscriber" 1973.0 1 249.0 79.0
2 883 2016-12-12 12:17:14 2016-12-12 12:31:58 3263 "Cooper Square & E 7 St" 40.729236 -73.990868 127 "Barrow St & Hudson St" 40.731724 -74.006744 20572 "Subscriber" 1963.0 1 79.0 158.0
3 427 2016-12-12 12:17:16 2016-12-12 12:24:24 484 "W 44 St & 5 Ave" 40.755003 -73.980144 492 "W 33 St & 7 Ave" 40.750200 -73.990931 25620 "Subscriber" 1985.0 1 161.0 186.0
4 478 2016-12-12 12:17:19 2016-12-12 12:25:18 519 "Pershing Square North" 40.751873 -73.977706 526 "E 33 St & 5 Ave" 40.747659 -73.984907 25019 "Subscriber" 1974.0 2 170.0 164.0

In [6]:
bike.sort('start_time') \
    .write.parquet('/data/citibike_spark.parquet', compression='snappy', mode='overwrite')

Convert and repartition Subway Dataframe using PySpark


In [1]:
# standard imports 
funcs = pyspark.sql.functions
types = pyspark.sql.types

In [2]:
sqlContext.sql("set spark.sql.shuffle.partitions=32")
subway = spark.read.parquet('/data/subway.parquet')
subway.registerTempTable("subway")

In [3]:
spark.sql('select * from subway limit 5').toPandas()


Out[3]:
ca unit scp station linename division endtime description cumul_entries cumul_exits
0 "A002" "R051" "02-00-00" "NULL" "NULL" "NULL" 1356145200000000 "REGULAR" 3922274 1352720
1 "A002" "R051" "02-00-00" "NULL" "NULL" "NULL" 1356159600000000 "REGULAR" 3922288 1352730
2 "A002" "R051" "02-00-00" "NULL" "NULL" "NULL" 1356174000000000 "REGULAR" 3922352 1352824
3 "A002" "R051" "02-00-00" "NULL" "NULL" "NULL" 1356188400000000 "REGULAR" 3922544 1352881
4 "A002" "R051" "02-00-00" "NULL" "NULL" "NULL" 1356202800000000 "REGULAR" 3922944 1352967

In [4]:
subway = \
    subway.withColumn('endtime', funcs.from_unixtime((subway.endtime/1000000).cast(types.IntegerType())) \
                      .cast(types.TimestampType()))
subway.registerTempTable("subway2")

In [5]:
spark.sql('select * from subway2 limit 5').toPandas()


Out[5]:
ca unit scp station linename division endtime description cumul_entries cumul_exits
0 "A002" "R051" "02-00-00" "NULL" "NULL" "NULL" 2012-12-21 22:00:00 "REGULAR" 3922274 1352720
1 "A002" "R051" "02-00-00" "NULL" "NULL" "NULL" 2012-12-22 02:00:00 "REGULAR" 3922288 1352730
2 "A002" "R051" "02-00-00" "NULL" "NULL" "NULL" 2012-12-22 06:00:00 "REGULAR" 3922352 1352824
3 "A002" "R051" "02-00-00" "NULL" "NULL" "NULL" 2012-12-22 10:00:00 "REGULAR" 3922544 1352881
4 "A002" "R051" "02-00-00" "NULL" "NULL" "NULL" 2012-12-22 14:00:00 "REGULAR" 3922944 1352967

In [6]:
subway = subway.sort("ca", "unit", "scp", "endtime")
subway.write.parquet('/data/subway_spark.parquet', compression='snappy', mode='overwrite')

Convert, repartition, and sort Taxi Dataframe using PySpark


In [1]:
# standard imports 
funcs = pyspark.sql.functions
types = pyspark.sql.types

In [3]:
sqlContext.sql("set spark.sql.shuffle.partitions=800")
taxi = spark.read.parquet('/data/all_trips_unprocessed.parquet')

In [4]:
taxi = (
    taxi.withColumn('dropoff_datetime', 
                    funcs.from_unixtime((taxi.dropoff_datetime/1000000).cast(types.IntegerType()))
                    .cast(types.TimestampType())) \
    .withColumn('pickup_datetime', 
                    funcs.from_unixtime((taxi.pickup_datetime/1000000).cast(types.IntegerType()))
                .cast(types.TimestampType())) \
    .withColumn('dropoff_taxizone_id', taxi.dropoff_taxizone_id.cast(types.IntegerType())) \
    .withColumn('pickup_taxizone_id', taxi.pickup_taxizone_id.cast(types.IntegerType())) \
    .withColumn('dropoff_latitude', taxi.dropoff_latitude.cast(types.FloatType())) \
    .withColumn('dropoff_longitude', taxi.dropoff_longitude.cast(types.FloatType())) \
    .withColumn('ehail_fee', taxi.ehail_fee.cast(types.FloatType())) \
    .withColumn('extra', taxi.extra.cast(types.FloatType())) \
    .withColumn('fare_amount', taxi.fare_amount.cast(types.FloatType())) \
    .withColumn('improvement_surcharge', taxi.improvement_surcharge.cast(types.FloatType())) \
    .withColumn('mta_tax', taxi.mta_tax.cast(types.FloatType())) \
    .withColumn('pickup_latitude', taxi.pickup_latitude.cast(types.FloatType())) \
    .withColumn('pickup_longitude', taxi.pickup_longitude.cast(types.FloatType())) \
    .withColumn('tip_amount', taxi.tip_amount.cast(types.FloatType())) \
    .withColumn('tolls_amount', taxi.tolls_amount.cast(types.FloatType())) \
    .withColumn('total_amount', taxi.total_amount.cast(types.FloatType())) \
    .withColumn('trip_distance', taxi.trip_distance.cast(types.FloatType())) \
    .withColumn('passenger_count', taxi.passenger_count.cast(types.IntegerType())) \
    .withColumn('rate_code_id', taxi.rate_code_id.cast(types.IntegerType())) 
#     .withColumn('trip_id', funcs.monotonically_increasing_id())
    )

In [5]:
taxi.sort('pickup_datetime').withColumn('trip_id', funcs.monotonically_increasing_id()) \
    .write.parquet('/data/all_trips_spark.parquet', compression='snappy', mode='overwrite')

Using Dask, Read, and set index on Taxi Dataframe produced using PySpark, then write to disk for easy reading in Dask

For some reason I don't yet understand, this code seems dependent on Dask 0.14.3, and breaks in 0.15.


In [1]:
import pandas as pd
import numpy as np
pd.options.display.max_rows = 100
pd.options.display.max_columns = 100

In [2]:
import dask.dataframe as dd
import dask.distributed 
client = dask.distributed.Client()

In [3]:
trips = dd.read_parquet('/data/all_trips_spark.parquet', engine='arrow')

In [4]:
trips.head()


Out[4]:
dropoff_datetime dropoff_latitude dropoff_longitude dropoff_taxizone_id ehail_fee extra fare_amount improvement_surcharge mta_tax passenger_count payment_type pickup_datetime pickup_latitude pickup_longitude pickup_taxizone_id rate_code_id store_and_fwd_flag tip_amount tolls_amount total_amount trip_distance trip_type vendor_id trip_id
0 2009-01-01 00:04:12 40.777058 -73.949608 263.0 NaN 0.0 5.800000 NaN NaN NaN "Cash" 2009-01-01 00:00:00 40.771244 -73.965919 237.0 NaN None 0.0 0.0 5.800000 1.3 "yellow" "CMT" 0
1 2009-01-01 00:05:03 40.735703 -74.005936 249.0 NaN 0.0 5.400000 NaN NaN NaN "Cash" 2009-01-01 00:00:00 40.725952 -73.997482 114.0 NaN None 0.0 0.0 5.400000 0.9 "yellow" "CMT" 1
2 2009-01-01 00:05:40 40.773746 -73.977753 43.0 NaN 0.0 5.800000 NaN NaN NaN "Cash" 2009-01-01 00:00:02 40.767391 -73.964798 237.0 NaN None 0.0 0.0 5.800000 1.0 "yellow" "CMT" 2
3 2009-01-01 00:03:08 40.709358 -74.013466 261.0 NaN 0.0 4.600000 NaN NaN NaN "Cash" 2009-01-01 00:00:04 40.708832 -74.011597 261.0 NaN None 0.0 0.0 4.600000 0.8 "yellow" "CMT" 3
4 2009-01-01 00:19:01 40.712368 -73.944580 80.0 NaN 0.0 27.799999 NaN NaN NaN "Cash" 2009-01-01 00:00:07 40.718578 -74.000648 144.0 NaN None 0.0 0.0 27.799999 5.5 "yellow" "CMT" 4

In [5]:
trips.tail()


Out[5]:
dropoff_datetime dropoff_latitude dropoff_longitude dropoff_taxizone_id ehail_fee extra fare_amount improvement_surcharge mta_tax passenger_count payment_type pickup_datetime pickup_latitude pickup_longitude pickup_taxizone_id rate_code_id store_and_fwd_flag tip_amount tolls_amount total_amount trip_distance trip_type vendor_id trip_id
1432504 2017-01-01 00:07:47 NaN NaN 36 NaN 0.5 7.0 0.3 0.5 NaN "2" 2016-12-31 23:59:57 NaN NaN 36 NaN "N" 0.00 0.0 8.300000 1.60 "green" "2" 6863359171512
1432505 2017-01-01 00:15:29 NaN NaN 63 NaN 0.0 16.5 0.3 0.5 NaN "2" 2016-12-31 23:59:58 NaN NaN 76 NaN "N" 0.00 0.0 17.299999 5.20 "green" "1" 6863359171513
1432506 2017-01-01 00:39:07 NaN NaN 161 NaN 0.5 33.5 0.3 0.5 NaN "1" 2016-12-31 23:59:58 NaN NaN 168 NaN "N" 6.96 0.0 41.759998 8.83 "green" "2" 6863359171514
1432507 2017-01-01 00:03:50 NaN NaN 209 NaN 0.5 5.0 0.3 0.5 NaN "2" 2016-12-31 23:59:58 NaN NaN 144 NaN "N" 0.00 0.0 6.300000 0.70 "yellow" "1" 6863359171515
1432508 2017-01-01 00:14:30 NaN NaN 134 NaN 0.5 13.0 0.3 0.5 NaN "2" 2016-12-31 23:59:59 NaN NaN 135 NaN "N" 0.00 0.0 14.300000 3.41 "green" "2" 6863359171516

In [6]:
# arrow engine adds quotes to all string fields for some reason. Strip them out.
dtypedict = dict(trips.dtypes)
for k in dtypedict:
    if dtypedict[k] == np.dtype('O'):
        trips[k] = trips[k].str.strip('"')

In [7]:
trips = trips.set_index('pickup_datetime', npartitions=trips.npartitions, sorted=True, compute=False)

In [8]:
trips.to_parquet('/data/all_trips.parquet', has_nulls=True, compression="SNAPPY", object_encoding='json')

In [ ]: