In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

In [2]:
sc = SparkContext(appName="UseParquet")
sqlContext = SQLContext(sc)

In [3]:
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
taxiparquet = sqlContext.read.parquet('./yellow_tripdata_2016-06-parquet')

In [4]:
print(taxiparquet)


DataFrame[vendor_id: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, passenger_count: int, trip_distance: float, pickup_longitude: float, pickup_latitude: float, rate_code_id: int, store_and_fwd_flag: string, dropoff_longitude: float, dropoff_latitude: float, payment_type: int, fare_amount: float, extra: float, mta_tax: float, tip_amount: float, tolls_amount: float, improvement_surcharge: float, total_amount: float]

In [5]:
def prettySummary(df):
    """ Neat summary statistics of a Spark dataframe
    Args:
        pyspark.sql.dataframe.DataFrame (df): input dataframe
    Returns:
        pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
    """
    import pandas as pd
    temp = df.describe().toPandas()
    temp.iloc[1:3,1:] = temp.iloc[1:3,1:].apply(pd.to_numeric, errors='coerce')
    pd.options.display.float_format = '{:,.2f}'.format
    return temp
prettySummary(taxiparquet)


Out[5]:
summary vendor_id passenger_count trip_distance pickup_longitude pickup_latitude rate_code_id store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount
0 count 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470
1 mean 1.53 1.66 3.04 -73.05 40.24 1.04 NaN -73.12 40.28 1.35 13.51 0.34 0.50 1.84 0.34 0.30 16.83
2 stddev 0.50 1.30 21.83 8.21 4.52 0.57 NaN 7.88 4.34 0.49 275.54 0.53 0.04 2.71 1.72 0.01 275.86
3 min 1 0 0.0 -118.18626 0.0 1 N -118.18626 0.0 1 -450.0 -41.23 -2.7 -67.7 -12.5 -0.3 -450.8
4 max 2 9 71732.7 0.0 64.09648 99 Y 106.24688 60.040714 5 628544.75 597.92 60.35 854.85 970.0 11.64 629033.75

In [6]:
taxiparquet.count()


Out[6]:
11135470

In [7]:
taxiparquet.registerTempTable("taxi")

In [8]:
sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").show()


+---------+--------+
|vendor_id|count(1)|
+---------+--------+
|        1| 5235710|
|        2| 5899760|
+---------+--------+


In [9]:
sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").withColumnRenamed('count(1)', 'number').show()


+---------+-------+
|vendor_id| number|
+---------+-------+
|        1|5235710|
|        2|5899760|
+---------+-------+


In [10]:
sqlContext.sql("SELECT COUNT(*) FROM taxi WHERE passenger_count >= 3 and total_amount > 10").show()


+--------+
|count(1)|
+--------+
| 1063287|
+--------+


In [11]:
taxiparquet.groupBy("vendor_id").count().show()


+---------+-------+
|vendor_id|  count|
+---------+-------+
|        1|5235710|
|        2|5899760|
+---------+-------+


In [12]:
taxiparquet.filter(taxiparquet.passenger_count == 1).count()


Out[12]:
7898295

In [13]:
taxiparquet.filter(taxiparquet.passenger_count == 2).count()


Out[13]:
1608298

In [14]:
taxiparquet.filter(taxiparquet.passenger_count >= 3).count()


Out[14]:
1628471

In [15]:
taxiparquet.filter("passenger_count >= 3 and total_amount > 10").count()


Out[15]:
1063287

In [16]:
taxiparquet.filter("passenger_count >= 9 and total_amount > 100").count()


Out[16]:
6

In [17]:
taxiparquet.filter( (taxiparquet.passenger_count >= 9) & (taxiparquet.total_amount > 100) ).count()


Out[17]:
6

In [18]:
taxiparquet.dtypes


Out[18]:
[('vendor_id', 'string'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('passenger_count', 'int'),
 ('trip_distance', 'float'),
 ('pickup_longitude', 'float'),
 ('pickup_latitude', 'float'),
 ('rate_code_id', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('dropoff_longitude', 'float'),
 ('dropoff_latitude', 'float'),
 ('payment_type', 'int'),
 ('fare_amount', 'float'),
 ('extra', 'float'),
 ('mta_tax', 'float'),
 ('tip_amount', 'float'),
 ('tolls_amount', 'float'),
 ('improvement_surcharge', 'float'),
 ('total_amount', 'float')]

In [19]:
taxiparquet.printSchema()


root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- rate_code_id: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)


In [20]:
taxiparquet1 = taxiparquet.withColumnRenamed('dropoff_longitude', 'dropoff_long').withColumnRenamed('dropoff_latitude', 'dropoff_lat').withColumnRenamed('pickup_latitude', 'pickup_lat').withColumnRenamed('pickup_longitude', 'pickup_long')

In [21]:
taxiparquet1.dtypes


Out[21]:
[('vendor_id', 'string'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('passenger_count', 'int'),
 ('trip_distance', 'float'),
 ('pickup_long', 'float'),
 ('pickup_lat', 'float'),
 ('rate_code_id', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('dropoff_long', 'float'),
 ('dropoff_lat', 'float'),
 ('payment_type', 'int'),
 ('fare_amount', 'float'),
 ('extra', 'float'),
 ('mta_tax', 'float'),
 ('tip_amount', 'float'),
 ('tolls_amount', 'float'),
 ('improvement_surcharge', 'float'),
 ('total_amount', 'float')]

In [22]:
import pandas as pd
taxi_many = taxiparquet.filter("passenger_count >= 9 and total_amount > 100").toPandas()

In [23]:
taxi_many


Out[23]:
vendor_id pickup_datetime dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude rate_code_id store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount
0 2 2016-06-14 17:37:52 2016-06-14 17:37:54 9 0.00 0.00 0.00 5 N -74.18 40.70 1 90.00 0.00 0.50 18.16 0.00 0.30 108.96
1 2 2016-06-30 12:57:18 2016-06-30 12:57:20 9 0.00 0.00 0.00 5 N -74.18 40.69 1 94.00 0.00 0.50 18.96 0.00 0.30 113.76
2 2 2016-06-10 10:09:25 2016-06-10 10:09:27 9 0.00 0.00 0.00 5 N -73.64 40.72 1 93.33 0.00 0.50 18.83 0.00 0.30 112.96
3 2 2016-06-04 23:34:09 2016-06-05 00:19:30 9 23.01 -73.78 40.64 5 N -74.05 40.79 1 90.25 0.00 0.50 21.42 16.04 0.30 128.51
4 2 2016-06-17 03:33:46 2016-06-17 04:11:05 9 17.79 -73.97 40.75 5 N -73.78 40.91 1 94.00 0.00 0.00 10.00 0.00 0.30 104.30
5 2 2016-06-14 22:22:00 2016-06-14 22:51:26 9 21.34 -74.01 40.72 5 N -74.36 40.71 1 95.00 0.00 0.00 0.00 10.50 0.30 105.80

In [24]:
taxiparquet.filter( (taxiparquet.dropoff_latitude < 10) & (taxiparquet.dropoff_longitude > -50) ).count()


Out[24]:
127824

In [25]:
taxiparquet.filter("trip_distance == 0.0").count()


Out[25]:
70811

In [26]:
from pyspark.sql.functions import *
taxiparquet.select(max("pickup_datetime")).show()


+--------------------+
|max(pickup_datetime)|
+--------------------+
| 2016-06-30 23:59:59|
+--------------------+


In [27]:
taxiparquet.select(min("pickup_datetime")).show()


+--------------------+
|min(pickup_datetime)|
+--------------------+
| 2016-06-01 00:00:00|
+--------------------+


In [28]:
taxiparquet_dates = taxiparquet.select(taxiparquet['pickup_datetime'], taxiparquet['dropoff_datetime'])

In [29]:
taxiparquet_dates.show(5, truncate=False)


+-------------------+-------------------+
|pickup_datetime    |dropoff_datetime   |
+-------------------+-------------------+
|2016-06-09 21:06:00|2016-06-09 21:25:39|
|2016-06-09 21:13:26|2016-06-09 21:19:16|
|2016-06-09 21:13:30|2016-06-09 21:18:20|
|2016-06-09 21:13:34|2016-06-09 21:39:57|
|2016-06-09 21:06:53|2016-06-09 21:18:13|
+-------------------+-------------------+
only showing top 5 rows


In [30]:
taxiparquet_dates = taxiparquet_dates.withColumn('pickup_1st', taxiparquet_dates.pickup_datetime < taxiparquet_dates.dropoff_datetime)

In [31]:
#bad records
taxiparquet_dates.filter(~taxiparquet_dates.pickup_1st).count()


Out[31]:
11560

In [32]:
#strange data
taxiparquet_dates.groupBy(month(taxiparquet_dates.dropoff_datetime)).count().show()


+-----------------------+--------+
|month(dropoff_datetime)|   count|
+-----------------------+--------+
|                     12|       1|
|                      6|11129946|
|                      7|    5523|
+-----------------------+--------+


In [33]:
max_pickup = taxiparquet.select(max("pickup_datetime")).collect()[0][0]
print(max_pickup)
from datetime import *
max_pickup < datetime(2017,12,31)


2016-06-30 23:59:59
Out[33]:
True