In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext(appName="UseParquet")
sqlContext = SQLContext(sc)
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
taxiparquet = sqlContext.read.parquet('./yellow_tripdata_2016-06-parquet')
def prettySummary(df):
    """ Neat summary statistics of a Spark dataframe
    Args:
        pyspark.sql.dataframe.DataFrame (df): input dataframe
    Returns:
        pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
    """
    import pandas as pd
    temp = df.describe().toPandas()
    temp.iloc[1:3,1:] = temp.iloc[1:3,1:].apply(pd.to_numeric, errors='coerce')
    pd.options.display.float_format = '{:,.2f}'.format
    return temp
prettySummary(taxiparquet)


Out[1]:
summary vendor_id passenger_count trip_distance pickup_longitude pickup_latitude rate_code_id store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount
0 count 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470 11135470
1 mean 1.53 1.66 3.04 -73.05 40.24 1.04 NaN -73.12 40.28 1.35 13.51 0.34 0.50 1.84 0.34 0.30 16.83
2 stddev 0.50 1.30 21.83 8.21 4.52 0.57 NaN 7.88 4.34 0.49 275.54 0.53 0.04 2.71 1.72 0.01 275.86
3 min 1 0 0.0 -118.18626 0.0 1 N -118.18626 0.0 1 -450.0 -41.23 -2.7 -67.7 -12.5 -0.3 -450.8
4 max 2 9 71732.7 0.0 64.09648 99 Y 106.24688 60.040714 5 628544.75 597.92 60.35 854.85 970.0 11.64 629033.75

In [2]:
taxi_select = taxiparquet.select(['passenger_count','trip_distance','total_amount','tip_amount','payment_type'])

In [3]:
prettySummary(taxi_select)


Out[3]:
summary passenger_count trip_distance total_amount tip_amount payment_type
0 count 11135470 11135470 11135470 11135470 11135470
1 mean 1.66 3.04 16.83 1.84 1.35
2 stddev 1.30 21.83 275.86 2.71 0.49
3 min 0 0.0 -450.8 -67.7 1
4 max 9 71732.7 629033.75 854.85 5

In [4]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [5]:
features = ['passenger_count','trip_distance','total_amount','tip_amount']
assembler = VectorAssembler(
    inputCols=features,
    outputCol='features')
assembled_taxi = assembler.transform(taxi_select)
assembled_taxi.show(5)


+---------------+-------------+------------+----------+------------+--------------------+
|passenger_count|trip_distance|total_amount|tip_amount|payment_type|            features|
+---------------+-------------+------------+----------+------------+--------------------+
|              1|          3.1|        15.8|       0.0|           2|[1.0,3.0999999046...|
|              1|         1.38|        9.96|      1.66|           1|[1.0,1.3799999952...|
|              1|         0.94|         6.8|       0.0|           2|[1.0,0.9399999976...|
|              1|          2.9|       22.55|      3.75|           1|[1.0,2.9000000953...|
|              1|          0.8|       11.15|      1.85|           1|[1.0,0.8000000119...|
+---------------+-------------+------------+----------+------------+--------------------+
only showing top 5 rows


In [6]:
taxi_select.head(5)


Out[6]:
[Row(passenger_count=1, trip_distance=3.0999999046325684, total_amount=15.800000190734863, tip_amount=0.0, payment_type=2),
 Row(passenger_count=1, trip_distance=1.3799999952316284, total_amount=9.960000038146973, tip_amount=1.659999966621399, payment_type=1),
 Row(passenger_count=1, trip_distance=0.9399999976158142, total_amount=6.800000190734863, tip_amount=0.0, payment_type=2),
 Row(passenger_count=1, trip_distance=2.9000000953674316, total_amount=22.549999237060547, tip_amount=3.75, payment_type=1),
 Row(passenger_count=1, trip_distance=0.800000011920929, total_amount=11.149999618530273, tip_amount=1.850000023841858, payment_type=1)]

In [7]:
print(taxi_select)
print(assembled_taxi)


DataFrame[passenger_count: int, trip_distance: float, total_amount: float, tip_amount: float, payment_type: int]
DataFrame[passenger_count: int, trip_distance: float, total_amount: float, tip_amount: float, payment_type: int, features: vector]

In [8]:
train, test = assembled_taxi.randomSplit([0.6, 0.4], seed=0)
#assembled_taxi.sample(False,0.4, seed=0)

In [9]:
lr = LinearRegression(maxIter=10).setLabelCol("payment_type").setFeaturesCol("features")
model = lr.fit(train)

In [10]:
testing_summary = model.evaluate(test)

In [11]:
testing_summary.rootMeanSquaredError


Out[11]:
0.43303192678194763

In [12]:
testing_summary.predictions.select('passenger_count','trip_distance','total_amount','tip_amount','payment_type','prediction').show(10)


+---------------+-------------+------------+----------+------------+------------------+
|passenger_count|trip_distance|total_amount|tip_amount|payment_type|        prediction|
+---------------+-------------+------------+----------+------------+------------------+
|              0|          0.0|         3.8|       0.0|           2|1.5013795539043826|
|              0|          0.0|        8.16|      1.36|           1|1.3825334814604624|
|              0|          0.0|        8.54|       0.0|           3|1.5014695446837982|
|              0|          0.0|         8.8|       0.0|           1|1.5014744808912242|
|              0|          0.0|       11.75|       0.0|           1|1.5015304878071687|
|              0|          0.0|       29.12|      5.82|           1|0.9929147399101657|
|              0|          0.0|       55.85|       0.0|           1|1.5023677437622172|
|              0|          0.0|       64.34|       6.0|           1|0.9778428377395803|
|              0|          0.0|        79.3|       0.0|           1|1.5028129513961823|
|              0|          0.0|       100.0|       0.0|           1|1.5032059490450274|
+---------------+-------------+------------+----------+------------+------------------+
only showing top 10 rows


In [13]:
from __future__ import print_function

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets.samples_generator import make_blobs
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors
from pyspark.sql.types import Row

In [15]:
from pyspark.sql.functions import monotonically_increasing_id
taxi_select = taxi_select.withColumn("id", monotonically_increasing_id()) 
#move id first (left)
taxi_select = taxi_select.select(['id','passenger_count','trip_distance','total_amount','tip_amount','payment_type'])
features = ['passenger_count','trip_distance','total_amount','tip_amount']
assembler = VectorAssembler(
    inputCols=features,
    outputCol='features')
assembled_taxid = assembler.transform(taxi_select).select('id', 'features')
assembled_taxid.show(5)


+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[1.0,3.0999999046...|
|  1|[1.0,1.3799999952...|
|  2|[1.0,0.9399999976...|
|  3|[1.0,2.9000000953...|
|  4|[1.0,0.8000000119...|
+---+--------------------+
only showing top 5 rows


In [16]:
cost = np.zeros(12)
sample = assembled_taxid.sample(False,0.1, seed=0)
for k in range(5,12):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(sample)
    cost[k] = model.computeCost(sample) # requires Spark 2.0 or later

In [18]:
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(5,12),cost[5:12])
ax.set_xlabel('k')
ax.set_ylabel('cost')


Out[18]:
Text(0, 0.5, 'cost')

In [19]:
k = 10
sample = assembled_taxid.sample(False,0.1, seed=0)
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(sample)
centers = model.clusterCenters()

print("Cluster Centers: ")
for center in centers:
    print(center)


Cluster Centers: 
[1.2136282  0.99803918 8.01215782 0.68890393]
[ 1.70377967 10.93754183 47.09557297  5.77064393]
[  1.45255474  19.78549437 145.87561419  21.02411414]
[1.0000000e+00 0.0000000e+00 8.4527998e+03 0.0000000e+00]
[ 1.67395693  7.14125488 31.95663653  3.70597098]
[ 1.72861121 16.90427598 67.60738581  8.25708733]
[ 1.71419062  4.73986311 21.40637738  0.37229279]
[ 1.6551671   3.46806611 20.85318726  3.50719458]
[ 1.49246873  2.10552109 13.75238905  1.53256651]
[4.87074192 1.22737269 9.16640543 0.82902584]

In [20]:
transformed = model.transform(sample).drop("features").select('id', 'prediction')
rows = transformed.collect()
print(rows[:3])


[Row(id=20, prediction=0), Row(id=33, prediction=7), Row(id=39, prediction=0)]

In [21]:
taxi_pred = sqlContext.createDataFrame(rows)
taxi_pred.show()


+---+----------+
| id|prediction|
+---+----------+
| 20|         0|
| 33|         7|
| 39|         0|
| 44|         0|
| 48|         0|
| 56|         9|
| 83|         0|
| 88|         4|
| 95|         0|
|107|         0|
|135|         8|
|173|         0|
|200|         0|
|208|         8|
|213|         4|
|216|         7|
|229|         7|
|230|         8|
|238|         8|
|246|         8|
+---+----------+
only showing top 20 rows


In [22]:
taxi_pred = taxi_pred.join(taxi_select, 'id')
taxi_pred.show()


+-----+----------+---------------+-------------+------------+----------+------------+
|   id|prediction|passenger_count|trip_distance|total_amount|tip_amount|payment_type|
+-----+----------+---------------+-------------+------------+----------+------------+
|  474|         7|              1|         2.59|       17.76|      2.96|           1|
| 1677|         0|              1|         1.35|         8.3|       0.0|           2|
| 2509|         8|              1|         1.63|       15.96|      2.66|           1|
| 2529|         0|              1|         0.33|         9.3|       0.0|           2|
| 5556|         0|              1|          1.1|        10.3|       1.5|           1|
| 9458|         8|              1|          1.6|        13.8|       1.0|           1|
|12568|         0|              2|         0.63|       10.56|      1.76|           1|
|13518|         8|              1|          1.6|        10.8|       2.0|           1|
|16530|         8|              2|          2.0|       15.96|      2.66|           1|
|19141|         0|              1|         1.29|         7.3|       0.0|           2|
|19163|         9|              4|          0.8|         7.8|       0.0|           2|
|19979|         0|              1|         1.03|         6.3|       0.0|           2|
|21342|         0|              1|         1.75|        10.8|       0.0|           2|
|27919|         8|              1|         1.37|       12.36|      2.06|           1|
|28242|         0|              1|         0.64|        6.96|      1.16|           1|
|30025|         6|              2|         5.85|        21.8|       0.0|           2|
|34340|         8|              6|         2.36|       14.04|      3.24|           1|
|34821|         1|              1|        10.04|       54.18|     10.84|           1|
|39104|         8|              2|         1.39|       12.36|      2.06|           1|
|39256|         0|              1|         0.63|         9.3|       1.0|           1|
+-----+----------+---------------+-------------+------------+----------+------------+
only showing top 20 rows


In [23]:
taxi_pred = taxi_pred.toPandas().set_index('id')
taxi_pred.head()


Out[23]:
prediction passenger_count trip_distance total_amount tip_amount payment_type
id
474 7 1 2.59 17.76 2.96 1
1677 0 1 1.35 8.30 0.00 2
2509 8 1 1.63 15.96 2.66 1
2529 0 1 0.33 9.30 0.00 2
5556 0 1 1.10 10.30 1.50 1