In [1]:
# standard imports
funcs = pyspark.sql.functions
types = pyspark.sql.types
In [2]:
sqlContext.sql("set spark.sql.shuffle.partitions=32")
bike = spark.read.parquet('/data/citibike.parquet')
bike.registerTempTable('bike')
In [3]:
spark.sql('select * from bike limit 5').toPandas()
Out[3]:
In [4]:
bike = (bike
.withColumn('start_time',
funcs.from_unixtime((bike.start_time/1000000).cast(types.IntegerType()))
.cast(types.TimestampType()))
.withColumn('stop_time',
funcs.from_unixtime((bike.stop_time/1000000).cast(types.IntegerType()))
.cast(types.TimestampType()))
# .withColumn('start_time',
# (funcs.substring(bike.start_time, 1, 20)).cast(types.TimestampType()))
# .withColumn('stop_time', bike.stop_time.cast(types.TimestampType())) \
.withColumn('start_taxizone_id', bike.start_taxizone_id.cast(types.FloatType()))
.withColumn('end_taxizone_id', bike.end_taxizone_id.cast(types.FloatType()))
)
bike.registerTempTable('bike2')
In [5]:
spark.sql('select * from bike2 limit 5').toPandas()
Out[5]:
In [6]:
bike.sort('start_time') \
.write.parquet('/data/citibike_spark.parquet', compression='snappy', mode='overwrite')
In [1]:
# standard imports
funcs = pyspark.sql.functions
types = pyspark.sql.types
In [2]:
sqlContext.sql("set spark.sql.shuffle.partitions=32")
subway = spark.read.parquet('/data/subway.parquet')
subway.registerTempTable("subway")
In [3]:
spark.sql('select * from subway limit 5').toPandas()
Out[3]:
In [4]:
subway = \
subway.withColumn('endtime', funcs.from_unixtime((subway.endtime/1000000).cast(types.IntegerType())) \
.cast(types.TimestampType()))
subway.registerTempTable("subway2")
In [5]:
spark.sql('select * from subway2 limit 5').toPandas()
Out[5]:
In [6]:
subway = subway.sort("ca", "unit", "scp", "endtime")
subway.write.parquet('/data/subway_spark.parquet', compression='snappy', mode='overwrite')
In [1]:
# standard imports
funcs = pyspark.sql.functions
types = pyspark.sql.types
In [3]:
sqlContext.sql("set spark.sql.shuffle.partitions=800")
taxi = spark.read.parquet('/data/all_trips_unprocessed.parquet')
In [4]:
taxi = (
taxi.withColumn('dropoff_datetime',
funcs.from_unixtime((taxi.dropoff_datetime/1000000).cast(types.IntegerType()))
.cast(types.TimestampType())) \
.withColumn('pickup_datetime',
funcs.from_unixtime((taxi.pickup_datetime/1000000).cast(types.IntegerType()))
.cast(types.TimestampType())) \
.withColumn('dropoff_taxizone_id', taxi.dropoff_taxizone_id.cast(types.IntegerType())) \
.withColumn('pickup_taxizone_id', taxi.pickup_taxizone_id.cast(types.IntegerType())) \
.withColumn('dropoff_latitude', taxi.dropoff_latitude.cast(types.FloatType())) \
.withColumn('dropoff_longitude', taxi.dropoff_longitude.cast(types.FloatType())) \
.withColumn('ehail_fee', taxi.ehail_fee.cast(types.FloatType())) \
.withColumn('extra', taxi.extra.cast(types.FloatType())) \
.withColumn('fare_amount', taxi.fare_amount.cast(types.FloatType())) \
.withColumn('improvement_surcharge', taxi.improvement_surcharge.cast(types.FloatType())) \
.withColumn('mta_tax', taxi.mta_tax.cast(types.FloatType())) \
.withColumn('pickup_latitude', taxi.pickup_latitude.cast(types.FloatType())) \
.withColumn('pickup_longitude', taxi.pickup_longitude.cast(types.FloatType())) \
.withColumn('tip_amount', taxi.tip_amount.cast(types.FloatType())) \
.withColumn('tolls_amount', taxi.tolls_amount.cast(types.FloatType())) \
.withColumn('total_amount', taxi.total_amount.cast(types.FloatType())) \
.withColumn('trip_distance', taxi.trip_distance.cast(types.FloatType())) \
.withColumn('passenger_count', taxi.passenger_count.cast(types.IntegerType())) \
.withColumn('rate_code_id', taxi.rate_code_id.cast(types.IntegerType()))
# .withColumn('trip_id', funcs.monotonically_increasing_id())
)
In [5]:
taxi.sort('pickup_datetime').withColumn('trip_id', funcs.monotonically_increasing_id()) \
.write.parquet('/data/all_trips_spark.parquet', compression='snappy', mode='overwrite')
In [1]:
import pandas as pd
import numpy as np
pd.options.display.max_rows = 100
pd.options.display.max_columns = 100
In [2]:
import dask.dataframe as dd
import dask.distributed
client = dask.distributed.Client()
In [3]:
trips = dd.read_parquet('/data/all_trips_spark.parquet', engine='arrow')
In [4]:
trips.head()
Out[4]:
In [5]:
trips.tail()
Out[5]:
In [6]:
# arrow engine adds quotes to all string fields for some reason. Strip them out.
dtypedict = dict(trips.dtypes)
for k in dtypedict:
if dtypedict[k] == np.dtype('O'):
trips[k] = trips[k].str.strip('"')
In [7]:
trips = trips.set_index('pickup_datetime', npartitions=trips.npartitions, sorted=True, compute=False)
In [8]:
trips.to_parquet('/data/all_trips.parquet', has_nulls=True, compression="SNAPPY", object_encoding='json')
In [ ]: