In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
In [2]:
sc = SparkContext(appName="UseParquet")
sqlContext = SQLContext(sc)
In [3]:
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
taxiparquet = sqlContext.read.parquet('./yellow_tripdata_2016-06-parquet')
In [4]:
print(taxiparquet)
In [5]:
def prettySummary(df):
""" Neat summary statistics of a Spark dataframe
Args:
pyspark.sql.dataframe.DataFrame (df): input dataframe
Returns:
pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
"""
import pandas as pd
temp = df.describe().toPandas()
temp.iloc[1:3,1:] = temp.iloc[1:3,1:].apply(pd.to_numeric, errors='coerce')
pd.options.display.float_format = '{:,.2f}'.format
return temp
prettySummary(taxiparquet)
Out[5]:
In [6]:
taxiparquet.count()
Out[6]:
In [7]:
taxiparquet.registerTempTable("taxi")
In [8]:
sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").show()
In [9]:
sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").withColumnRenamed('count(1)', 'number').show()
In [10]:
sqlContext.sql("SELECT COUNT(*) FROM taxi WHERE passenger_count >= 3 and total_amount > 10").show()
In [11]:
taxiparquet.groupBy("vendor_id").count().show()
In [12]:
taxiparquet.filter(taxiparquet.passenger_count == 1).count()
Out[12]:
In [13]:
taxiparquet.filter(taxiparquet.passenger_count == 2).count()
Out[13]:
In [14]:
taxiparquet.filter(taxiparquet.passenger_count >= 3).count()
Out[14]:
In [15]:
taxiparquet.filter("passenger_count >= 3 and total_amount > 10").count()
Out[15]:
In [16]:
taxiparquet.filter("passenger_count >= 9 and total_amount > 100").count()
Out[16]:
In [17]:
taxiparquet.filter( (taxiparquet.passenger_count >= 9) & (taxiparquet.total_amount > 100) ).count()
Out[17]:
In [18]:
taxiparquet.dtypes
Out[18]:
In [19]:
taxiparquet.printSchema()
In [20]:
taxiparquet1 = taxiparquet.withColumnRenamed('dropoff_longitude', 'dropoff_long').withColumnRenamed('dropoff_latitude', 'dropoff_lat').withColumnRenamed('pickup_latitude', 'pickup_lat').withColumnRenamed('pickup_longitude', 'pickup_long')
In [21]:
taxiparquet1.dtypes
Out[21]:
In [22]:
import pandas as pd
taxi_many = taxiparquet.filter("passenger_count >= 9 and total_amount > 100").toPandas()
In [23]:
taxi_many
Out[23]:
In [24]:
taxiparquet.filter( (taxiparquet.dropoff_latitude < 10) & (taxiparquet.dropoff_longitude > -50) ).count()
Out[24]:
In [25]:
taxiparquet.filter("trip_distance == 0.0").count()
Out[25]:
In [26]:
from pyspark.sql.functions import *
taxiparquet.select(max("pickup_datetime")).show()
In [27]:
taxiparquet.select(min("pickup_datetime")).show()
In [28]:
taxiparquet_dates = taxiparquet.select(taxiparquet['pickup_datetime'], taxiparquet['dropoff_datetime'])
In [29]:
taxiparquet_dates.show(5, truncate=False)
In [30]:
taxiparquet_dates = taxiparquet_dates.withColumn('pickup_1st', taxiparquet_dates.pickup_datetime < taxiparquet_dates.dropoff_datetime)
In [31]:
#bad records
taxiparquet_dates.filter(~taxiparquet_dates.pickup_1st).count()
Out[31]:
In [32]:
#strange data
taxiparquet_dates.groupBy(month(taxiparquet_dates.dropoff_datetime)).count().show()
In [33]:
max_pickup = taxiparquet.select(max("pickup_datetime")).collect()[0][0]
print(max_pickup)
from datetime import *
max_pickup < datetime(2017,12,31)
Out[33]: