In [2]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
In [5]:
%config InlineBackend.figure_format = 'retina'
In [6]:
import sys, platform
platform.python_version()
Out[6]:
In [7]:
import numpy as np
In [8]:
import pandas as pd
pd.set_option('display.max_colwidth', -1)
pd.set_option('display.max_columns', None)
In [9]:
from pyspark import SparkContext, SQLContext, SparkConf
conf = SparkConf().setMaster("local").set("spark.driver.memory", "1g").set("spark.executor.memory", "1g")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
## spark #spark object entry point Spark 2.0, handle to a spark session
sc
sqlContext
Out[9]:
Out[9]:
In [10]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)
In [11]:
spark.version
Out[11]:
In [ ]:
In [12]:
spark.conf.get("spark.sql.shuffle.partitions")
spark.conf.set("spark.sql.shuffle.partitions",3)
spark.conf.get("spark.sql.shuffle.partitions")
Out[12]:
Out[12]:
In [13]:
data_path = 'Data/zubie_trips_anonymous/'
In [14]:
hadoop = spark._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path(data_path)
for f in fs.get(conf).listStatus(path):
name = str(f.getPath())
if name.endswith('.parquet'):
print ("{}, rows: {} ".format( name.split('/')[-1], sqlContext.read.parquet(name).count() ))
In [15]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, LongType, FloatType, DoubleType
In [16]:
df = spark.read.parquet(data_path)
print ( "total num of rows: {}".format( df.count() ) )
In [17]:
print ( "Number of partitions: {}".format(df.rdd.getNumPartitions()) )
In [18]:
print ( " Data Frame Schema: " )
df.printSchema()
In [19]:
df.limit(10).toPandas()
Out[19]:
In [20]:
cols = ['user','start_point_place','end_point_place','mpg_combined','fuel_consumed','fuel_cost']
for col in cols:
print ( "\'{}\' non NULL entries: {}".format(col,df.select(col).dropna().count()) )
print ( "\'tags\' non NULL entries: {}".format(df.select(F.explode('tags')).dropna().count()) )
cols.append('tags')
from functools import reduce
from pyspark.sql import DataFrame
df = reduce(DataFrame.drop, cols, df)
print ("\n Columns dropped ")
In [21]:
df.select('static_map_url').limit(5).toPandas() #show(5,truncate=False)
import urllib, requests #check first 10 urls
for url in [ c['static_map_url'] for c in df.limit(10).rdd.collect() ]:
print (url)
print (' satus code {}\n'.format( requests.get(url).status_code ))
df = df.drop('static_map_url')
Out[21]:
In [22]:
pdf = df.toPandas()
df0 = df.dropna(how='any')
pdf = df0.toPandas()
pdf.shape
df0 = df0.dropDuplicates()
pdf.shape
pdf.drop_duplicates(inplace=True)
pdf.shape
df0.count()
df0.select('key','vehicle_nickname').groupBy('vehicle_nickname','key').count().count()
Out[22]:
Out[22]:
Out[22]:
Out[22]:
Out[22]:
In [23]:
df1 = df.dropDuplicates()
df2 = df.dropna(how='any')
df.count()
df1.count()
df2.count()
df1.dropna(how='any').count()
df2.dropDuplicates().count()
Out[23]:
Out[23]:
Out[23]:
Out[23]:
Out[23]:
In [24]:
dfa = df.dropna(how='any')
dfa.count()
Out[24]:
In [25]:
df = df.dropDuplicates()
df.count()
Out[25]:
In [26]:
df.limit(5).toPandas()
Out[26]:
In [27]:
numOfRows_all = df.count()
for c in df.columns:
n = 'dummy' #df.where( F.col(c).isNull() ).count()
print ("\'{}\' -- null counts: {}".format(c,n))
df = df.dropna(how='any')
numOfRows = df.count()
print ("\n Counts before row drop: {}\n".format(numOfRows_all))
print (" Counts after \"any null\" row drop: {0:.0f}, fraction: {1:.2f}\n".format(numOfRows,numOfRows*1./numOfRows_all))
In [28]:
df2 = df
df2.count()
Out[28]:
In [29]:
df.select('distance_um').distinct().show()
df.select('speed_um').distinct().show()
df.select('fuel_consumed_um').distinct().show()
df.select('fuel_cost_currency_code','fuel_cost_currency_symbol','fuel_ppg').distinct().show()
df = df.drop('fuel_cost_currency_code','fuel_cost_currency_symbol','fuel_ppg','fuel_consumed_um')
In [30]:
df.select('top_speed','top_speed_mph').describe().show()
df.select('obd_distance','obd_miles').describe().show()
df.select('gps_distance','gps_miles').describe().show()
df = df.drop('top_speed_mph','obd_miles','gps_miles')
## use np.allclose no for large arrays
In [31]:
df.select('trip_segments').distinct().show()
df.select('fuel_type').distinct().show()
In [32]:
df = df.drop('trip_segments','fuel_type')
In [33]:
df.select('end_point_address_country').distinct().show()
df.select('start_point_address_country').distinct().show()
df.select('start_point_address_state').distinct().show() #Cymru is the Welsh name of Wales
df.select('end_point_address_state').distinct().show()
df.select('start_point_address_state','end_point_address_state').distinct().show()
df = df.drop('start_point_address_country','end_point_address_country')
In [34]:
print (" Num of records with with state adress = Cymru: {} ".format( df.filter(df['start_point_address_state']=='Cymru').count() ))
In [35]:
df.filter(df['start_point_address_state']=='Cymru').toPandas().head()
Out[35]:
In [36]:
df = df.replace('Cymru','Wales')
In [37]:
df.limit(5).toPandas().head()
Out[37]:
In [38]:
df.groupBy('start_point_timestamp_tz').pivot('end_point_timestamp_tz').count().show()
df.groupBy('start_point_timestamp_tz').pivot('start_point_daylight_saving_time_flag').count().show()
df.groupBy('end_point_timestamp_tz').pivot('end_point_daylight_saving_time_flag').count().show()
df = df.drop('start_point_daylight_saving_time_flag','end_point_daylight_saving_time_flag')\
.drop('end_point_timestamp_tz')
df = df.withColumnRenamed('start_point_timestamp_tz','trip_tz')
In [39]:
from pyspark.sql.types import TimestampType
def colsToTimestamp(df,colnames,format):
for name in colnames:
df = df.withColumn(name,F.unix_timestamp(df[name],timestamp_format).cast(TimestampType()))
return df
timestamp_format = 'yyyy-MM-dd HH:mm:ss'
timestamp_cols = ['start_point_timestamp','start_point_timestamp_utc','end_point_timestamp','end_point_timestamp_utc']
df = colsToTimestamp(df,timestamp_cols,timestamp_format)
print (" check types schema: ")
print ([(c.name,c.dataType) for c in df.schema if c.name in timestamp_cols])
In [40]:
df.limit(5).toPandas().head()
Out[40]:
In [41]:
col = 'start_point_timestamp'
df.select(col,'trip_tz' ) \
.groupBy(F.year(col),F.month(col),F.dayofmonth(col),'trip_tz')\
.count()\
.orderBy(F.year(col),F.month(col),F.dayofmonth(col)) \
.toPandas() #.show(n=365,truncate=False)
In [42]:
timing_cols = [n for n in df.columns if 'timestamp' in n]
timing_cols.append('trip_tz')
timing_cols.append('duration_seconds')
df.select( [c for c in timing_cols] ) \
.withColumn('mydt',F.unix_timestamp(df['end_point_timestamp_utc'])-F.unix_timestamp(df['start_point_timestamp_utc'])) \
.limit(5).toPandas().head() #(.show(5,truncate=False)
df.select( [c for c in timing_cols] ) \
.withColumn('mydt',F.unix_timestamp(df['end_point_timestamp_utc'])-F.unix_timestamp(df['start_point_timestamp_utc'])) \
.describe('duration_seconds','mydt') \
.show()
df = df.drop('start_point_timestamp_utc','end_point_timestamp_utc')
Out[42]:
In [43]:
df = df.withColumnRenamed('start_point_latitude','start_point_lat') \
.withColumnRenamed('start_point_longitude','start_point_long') \
.withColumnRenamed('end_point_latitude','end_point_lat') \
.withColumnRenamed('end_point_longitude','end_point_long')
In [44]:
latlong_cols = ['start_point_lat','start_point_long','end_point_lat','end_point_long']
timestamp_cols = [n for n in df.columns if 'timestamp' in n]
distance_cols = ['gps_distance','obd_distance']
duration_cols = ['duration_seconds','idle_seconds']
In [ ]:
cols = timestamp_cols + duration_cols + latlong_cols + ['gps_distance']
print (" number of rows where idle time >= trip duration: {}"\
.format( df.where(df['idle_seconds'] >= df['duration_seconds']).count() )
)
print (" number of rows where idle time < 0: {}" \
.format( df.where(df['idle_seconds'] < 0).count() )
)
df.select(cols)\
.where( df['idle_seconds'] >= df['duration_seconds'] )\
.toPandas() #.show(truncate=False)
df.select(cols)\
.where( df['idle_seconds'] < 0 )\
.toPandas() #.show(truncate=False)
df = df.where( (df['duration_seconds'] > df['idle_seconds']) & (df['idle_seconds'] >= 0) )
print ("rows after selection: {}".format(df.count()))
Out[ ]:
Out[ ]:
In [ ]:
df2 = df
df2.count()
In [ ]:
df.select(latlong_cols)\
.describe()\
.show()
In [ ]:
df.select('top_speed')\
.describe()\
.toPandas() #.show()
df.select( [c for c in df.columns if c.startswith('speeding')] )\
.describe()\
.show()
df.select( [c for c in df.columns if 'brake' in c ] )\
.describe()\
.show()
df.select( [c for c in df.columns if 'accel' in c] )\
.describe()\
.show()
df.select( [c for c in df.columns if c.startswith('points')] )\
.describe()\
.show()
In [ ]:
schema_cols = [(c.name,c.dataType) for c in df.schema]
for c in schema_cols:
name,dtype = c
if dtype == DoubleType():
df = df.withColumn(name,df[name].cast(FloatType()))
elif dtype == LongType():
df = df.withColumn(name,df[name].cast(IntegerType()))
df = df.withColumn('duration_seconds',df['duration_seconds'].cast(IntegerType()))
df.printSchema()
In [ ]:
df.limit(5).toPandas().head()
In [ ]:
print (" key entries: {}"\
.format( df.select('key').distinct().count() )
)
df = df.drop('key')
In [ ]:
print (" Number of device_key: {}"\
.format( df.select('device_key').distinct().count() )
)
In [ ]:
print ("Number of unique vehicle_key, vehicle_nickname: ",\
df.select('vehicle_key','vehicle_nickname').groupBy('vehicle_key','vehicle_nickname').count().count()
)
df.select('vehicle_key','vehicle_nickname').groupBy('vehicle_key','vehicle_nickname').count().orderBy('vehicle_nickname').show(n=100,truncate=False)
In [ ]:
print ("Number of unique device_key, vehicle_nickname: ",\
df.select('device_key','vehicle_nickname').groupBy('device_key','vehicle_nickname').count().count()
)
df.select('device_key','vehicle_nickname').groupBy('device_key','vehicle_nickname').count().orderBy('vehicle_nickname').show(n=100,truncate=False)
In [ ]:
device_keys = df.select('device_key').distinct().orderBy('device_key').rdd.flatMap(lambda x:x).collect()
getID_udf = F.udf(lambda k: device_keys.index(k), IntegerType())
df = df.withColumn('device_id',getID_udf(df['device_key']))
In [ ]:
vehicle_names = df.select('vehicle_nickname').distinct().orderBy('vehicle_nickname').rdd.flatMap(lambda x:x).collect()
getID_udf = F.udf(lambda k: vehicle_names.index(k), IntegerType())
df = df.withColumn('vehicle_id',getID_udf(df['vehicle_nickname']))
In [ ]:
df = df.drop('device_key')
df = df.drop('vehicle_key')
In [ ]:
df.printSchema()
In [ ]:
for c in [c for c in df.columns if 'address' in c]:
df = df.withColumnRenamed(c,c.replace('address_',''))
df.columns
In [ ]:
cols = ['device_id','vehicle_id','vehicle_nickname'] \
+ [c for c in df.columns if c.startswith('start')] \
+ [c for c in df.columns if c.startswith('end')] \
+ ['trip_tz'] \
+ ['duration_seconds','idle_seconds'] \
+ ['gps_distance','obd_distance'] \
+ ['top_speed'] \
+ ['hard_accel_count','hard_brake_count'] \
+ ['points_city_count','points_hwy_count'] \
+ [c for c in df.columns if c.startswith('speeding')] \
+ ['speed_um','distance_um']
len(cols)
len(df.columns)
In [ ]:
df = df.select(cols)
df = df.orderBy('start_point_timestamp','vehicle_id')
df.count()
In [ ]:
df.limit(50).toPandas()
In [ ]:
df.select('vehicle_nickname').distinct().toPandas()['vehicle_nickname'].values.tolist()
In [ ]:
cols = ['device_id','vehicle_id']\
+ ['start_point_lat','start_point_long'] \
+ ['end_point_lat','end_point_long'] \
+ ['duration_seconds','idle_seconds'] \
+ ['gps_distance','obd_distance'] \
+ ['top_speed'] \
+ ['hard_accel_count','hard_brake_count'] \
+ ['points_city_count','points_hwy_count'] \
+ ['speeding_city_major_count','speeding_city_minor_count']\
+ ['speeding_hwy_major_count','speeding_hwy_minor_count'] \
+ ['start_point_timestamp','end_point_timestamp']
df_num = df.select(cols)\
.orderBy('start_point_timestamp','vehicle_id')\
.withColumn('start_ts',F.unix_timestamp('start_point_timestamp'))\
.withColumn('start_y',F.year('start_point_timestamp'))\
.withColumn('start_m',F.month('start_point_timestamp'))\
.withColumn('start_day',F.dayofmonth('start_point_timestamp'))\
.withColumn('start_day2',F.dayofyear('start_point_timestamp'))\
.withColumn('start_hh',F.hour('start_point_timestamp'))\
.withColumn('start_mm',F.minute('start_point_timestamp'))\
.withColumn('start_ss',F.second('start_point_timestamp'))\
.withColumn('end_ts',F.unix_timestamp('end_point_timestamp'))\
.withColumn('end_y',F.year('end_point_timestamp'))\
.withColumn('end_m',F.month('end_point_timestamp'))\
.withColumn('end_day',F.dayofmonth('end_point_timestamp'))\
.withColumn('end_day2',F.dayofyear('end_point_timestamp'))\
.withColumn('end_hh',F.hour('end_point_timestamp'))\
.withColumn('end_mm',F.minute('end_point_timestamp'))\
.withColumn('end_ss',F.second('end_point_timestamp'))
df_num = df_num.drop('start_point_timestamp','end_point_timestamp')
In [ ]:
df_num.limit(50).toPandas()
In [ ]:
df.limit(50).toPandas()
In [ ]:
pdf = df.toPandas()
pdf.to_pickle('zubie.pkl')
In [ ]:
pdf0 = pd.read_pickle('zubie.pkl')
pdf0.dtypes
In [ ]:
! rm -rf ./output
df.repartition(1).write.parquet('./output')
In [ ]: