In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext(appName="UseParquet")
sqlContext = SQLContext(sc)
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
taxiparquet = sqlContext.read.parquet('./yellow_tripdata_2016-06-parquet')
def prettySummary(df):
""" Neat summary statistics of a Spark dataframe
Args:
pyspark.sql.dataframe.DataFrame (df): input dataframe
Returns:
pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
"""
import pandas as pd
temp = df.describe().toPandas()
temp.iloc[1:3,1:] = temp.iloc[1:3,1:].apply(pd.to_numeric, errors='coerce')
pd.options.display.float_format = '{:,.2f}'.format
return temp
prettySummary(taxiparquet)
Out[1]:
In [2]:
taxi_select = taxiparquet.select(['passenger_count','trip_distance','total_amount','tip_amount','payment_type'])
In [3]:
prettySummary(taxi_select)
Out[3]:
In [4]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
In [5]:
features = ['passenger_count','trip_distance','total_amount','tip_amount']
assembler = VectorAssembler(
inputCols=features,
outputCol='features')
assembled_taxi = assembler.transform(taxi_select)
assembled_taxi.show(5)
In [6]:
taxi_select.head(5)
Out[6]:
In [7]:
print(taxi_select)
print(assembled_taxi)
In [8]:
train, test = assembled_taxi.randomSplit([0.6, 0.4], seed=0)
#assembled_taxi.sample(False,0.4, seed=0)
In [9]:
lr = LinearRegression(maxIter=10).setLabelCol("payment_type").setFeaturesCol("features")
model = lr.fit(train)
In [10]:
testing_summary = model.evaluate(test)
In [11]:
testing_summary.rootMeanSquaredError
Out[11]:
In [12]:
testing_summary.predictions.select('passenger_count','trip_distance','total_amount','tip_amount','payment_type','prediction').show(10)
In [13]:
from __future__ import print_function
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets.samples_generator import make_blobs
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors
from pyspark.sql.types import Row
In [15]:
from pyspark.sql.functions import monotonically_increasing_id
taxi_select = taxi_select.withColumn("id", monotonically_increasing_id())
#move id first (left)
taxi_select = taxi_select.select(['id','passenger_count','trip_distance','total_amount','tip_amount','payment_type'])
features = ['passenger_count','trip_distance','total_amount','tip_amount']
assembler = VectorAssembler(
inputCols=features,
outputCol='features')
assembled_taxid = assembler.transform(taxi_select).select('id', 'features')
assembled_taxid.show(5)
In [16]:
cost = np.zeros(12)
sample = assembled_taxid.sample(False,0.1, seed=0)
for k in range(5,12):
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(sample)
cost[k] = model.computeCost(sample) # requires Spark 2.0 or later
In [18]:
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(5,12),cost[5:12])
ax.set_xlabel('k')
ax.set_ylabel('cost')
Out[18]:
In [19]:
k = 10
sample = assembled_taxid.sample(False,0.1, seed=0)
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(sample)
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
In [20]:
transformed = model.transform(sample).drop("features").select('id', 'prediction')
rows = transformed.collect()
print(rows[:3])
In [21]:
taxi_pred = sqlContext.createDataFrame(rows)
taxi_pred.show()
In [22]:
taxi_pred = taxi_pred.join(taxi_select, 'id')
taxi_pred.show()
In [23]:
taxi_pred = taxi_pred.toPandas().set_index('id')
taxi_pred.head()
Out[23]: