In [1]:
import sys, os, re
import json
import datetime, iso8601
base_path = ".."
In [2]:
APP_NAME = "Debugging Prediction Problems"
# If there is no SparkSession, create the environment
try:
sc and spark
except NameError as e:
import findspark
findspark.init()
import pyspark
import pyspark.sql
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()
print("PySpark initiated...")
In [3]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf
schema = StructType([
StructField("ArrDelay", DoubleType(), True), # "ArrDelay":5.0
StructField("CRSArrTime", TimestampType(), True), # "CRSArrTime":"2015-12-31T03:20:00.000-08:00"
StructField("CRSDepTime", TimestampType(), True), # "CRSDepTime":"2015-12-31T03:05:00.000-08:00"
StructField("Carrier", StringType(), True), # "Carrier":"WN"
StructField("DayOfMonth", IntegerType(), True), # "DayOfMonth":31
StructField("DayOfWeek", IntegerType(), True), # "DayOfWeek":4
StructField("DayOfYear", IntegerType(), True), # "DayOfYear":365
StructField("DepDelay", DoubleType(), True), # "DepDelay":14.0
StructField("Dest", StringType(), True), # "Dest":"SAN"
StructField("Distance", DoubleType(), True), # "Distance":368.0
StructField("FlightDate", DateType(), True), # "FlightDate":"2015-12-30T16:00:00.000-08:00"
StructField("FlightNum", StringType(), True), # "FlightNum":"6109"
StructField("Origin", StringType(), True), # "Origin":"TUS"
])
input_path = "{}/data/simple_flight_delay_features.json".format(base_path)
features = spark.read.json(
input_path,
schema=schema
)
features.select("ArrDelay", "CRSArrTime", "CRSDepTime", "Carrier", "DepDelay",
"Origin", "Dest", "FlightDate", "FlightNum", "Distance").show(12)
In [4]:
# Load the arrival delay bucketizer
from pyspark.ml.feature import Bucketizer
# Load the departure delay bucketizer
arrival_bucketizer_path = "{}/models/arrival_bucketizer.bin".format(base_path)
arrival_bucketizer = Bucketizer.load(arrival_bucketizer_path)
# Bucketize the departure and arrival delays for classification
ml_bucketized_features = arrival_bucketizer.transform(features)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(10)
In [66]:
# Check the frequency of each category
ml_bucketized_features.registerTempTable("ml_bucketized_features")
spark.sql("SELECT MIN(ArrDelay), MAX(ArrDelay) FROM ml_bucketized_features").show()
And now lets check the distribution of arrival delays as they fall into our bucketing scheme.
In [5]:
spark.sql(
"""
SELECT
ArrDelayBucket,
COUNT(*) AS Total,
ROUND(
100 * (COUNT(*)/(SELECT COUNT(*) FROM ml_bucketized_features)),
2
) AS Total_Pct
FROM ml_bucketized_features
GROUP BY ArrDelayBucket
"""
).show()
The results are pretty skewed, with 81.4% in ArrDelayBucket
0.0
. Note that our model always predicts 0.0
...
In [6]:
# Check out some values from the field
spark.sql("SELECT ArrDelay, ArrDelayBucket "
"FROM ml_bucketized_features "
"WHERE ArrDelayBucket == 0.0")\
.sample(False, 0.01, 11).show(15)
There looks to be a pretty broad range of values in ArrDelayBucket
0.0
...
In [7]:
# Check out the min/max for the field
spark.sql("SELECT MIN(ArrDelay), MAX(ArrDelay) "
"FROM ml_bucketized_features "
"WHERE ArrDelayBucket == 0.0").show()
There is a range of 101 in this bucket. This encodes a large range... from 14 minute late all the way to over an hour early.
ArrDelay
Now, what is the distribution of values in ArrDelayBucket
0.0
? I want to get a sense of what this bucket encodes. We can filter to a relation with only ArrDelayBucket
values of 0.0
, convert our DataFrame
to an RDD
and then use the RDD.histogram()
function to get the raw data for our histogram.
First, however, I need the histogram of the entire range of data to compare this to. We need to supply our own buckets, or the histogram is hard to interpret.
In [8]:
# Look at overall histogram
ml_bucketized_features\
.select("ArrDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram([-87.0, -60, -30, -15, 0, 15, 30, 60, 120])
Out[8]:
We must use matplotlib.pyplot.bar instead of matplotlib.pyplot.hist because we have already computed our bins and their weights.
In [9]:
%matplotlib inline
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
# Look at overall histogram
data_tuple = ml_bucketized_features\
.select("ArrDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram([-87.0, -60, -30, -15, 0, 15, 30, 60, 120])
heights = np.array(data_tuple[1])
# The bins are 1 > length than the values
full_bins = data_tuple[0]
# Bars are drawn from the left
mid_point_bins = full_bins[:-1]
# The width of a bar should be the range it maps in the data
widths = [abs(i - j) for i, j in zip(full_bins[:-1], full_bins[1:])]
# And now the bar should plot nicely
bar = plt.bar(mid_point_bins, heights, width=widths, color='b')
We can see here that there is a strong mean in our data around 0-15, with some right left, and that most of the data was falling either in ArrDelayBucket
0.0
or very nearly so in the next bin. This must make the bins hard to reckon for the classifier.
ArrDelayBucket
This histogram seems to indicate that we might address our problem by re-bucketizing ArrDelayBucket
into something more balanced. While the original schema of <15,15-60,60+
seemed to make sense from a user perspective... it seems to have wrecked our model. So lets try something that respects the distribution of delay...
Before we dive in and change our buckets, lets play with defining different buckets for our histogram and see if we can create balanced bars! To play around with this without repeating ourselves, lets create a function create_hist
to help.
In [10]:
def create_hist(rdd_histogram_data):
"""Given an RDD.histogram, plot a pyplot histogram"""
heights = np.array(rdd_histogram_data[1])
full_bins = rdd_histogram_data[0]
mid_point_bins = full_bins[:-1]
widths = [abs(i - j) for i, j in zip(full_bins[:-1], full_bins[1:])]
bar = plt.bar(mid_point_bins, heights, width=widths, color='b')
return bar
In [67]:
%matplotlib inline
buckets = [-87.0, 15, 60, 200]
rdd_histogram_data = ml_bucketized_features\
.select("ArrDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram(buckets)
create_hist(rdd_histogram_data)
Out[67]:
In [11]:
%matplotlib inline
buckets = [-87.0, -30, -15, 0, 15, 30, 120]
rdd_histogram_data = ml_bucketized_features\
.select("ArrDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram(buckets)
create_hist(rdd_histogram_data)
Out[11]:
Hmmmm lets try again.
In [68]:
%matplotlib inline
buckets = [-87.0, -15, 0, 15, 30, 120]
rdd_histogram_data = ml_bucketized_features\
.select("ArrDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram(buckets)
create_hist(rdd_histogram_data)
Out[68]:
Lets knock out our smallest bucket and try again...
In [13]:
%matplotlib inline
buckets = [-87.0, -15, 0, 30, 120]
rdd_histogram_data = ml_bucketized_features\
.select("ArrDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram(buckets)
create_hist(rdd_histogram_data)
Out[13]:
These look about right! The buckets [-87, -15, 0, 30, 120]
correspond to human interprettable categories: [early, on-time, late, very late]
. Lets use these buckets to recompute ArrDelayBucket
and see if our problem goes away!
In [14]:
from pyspark.ml.feature import Bucketizer
# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
splits=splits,
inputCol="ArrDelay",
outputCol="ArrDelayBucket"
)
# Save the model
arrival_bucketizer_path = "{}/models/arrival_bucketizer_2.0.bin".format(base_path)
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)
In [15]:
# Apply it to our data
ml_bucketized_features = arrival_bucketizer.transform(features)
ml_bucketized_features\
.select("ArrDelay", "ArrDelayBucket")\
.sample(False, 0.01, 27)\
.show(12)
We're seeing examples of each label in our data. This is good. This looks differentiable. Now lets load all our models and see what happens...
Lets load all the models we've saved that vecotirize our raw data, using code from ch08/make_predictions.py
.
In [16]:
# Load the arrival delay bucketizer
from pyspark.ml.feature import Bucketizer
arrival_bucketizer_path = "{}/models/arrival_bucketizer_2.0.bin".format(base_path)
arrival_bucketizer = Bucketizer.load(arrival_bucketizer_path)
# Load the departure delay bucketizer
departure_bucketizer_path = "{}/models/departure_bucketizer.bin".format(base_path)
departure_bucketizer = Bucketizer.load(departure_bucketizer_path)
# Load all the string field vectorizer pipelines into a dict
from pyspark.ml import PipelineModel
string_vectorizer_pipeline_models = {}
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
"Origin", "Dest", "FlightNum", "DepDelayBucket"]:
string_pipeline_model_path = "{}/models/string_indexer_pipeline_model_{}.bin".format(
base_path,
column
)
string_pipeline_model = PipelineModel.load(string_pipeline_model_path)
string_vectorizer_pipeline_models[column] = string_pipeline_model
# Load the numeric vector assembler
from pyspark.ml.feature import VectorAssembler
vector_assembler_path = "{}/models/numeric_vector_assembler.bin".format(base_path)
vector_assembler = VectorAssembler.load(vector_assembler_path)
# Load the final assembler
final_assembler_path = "{}/models/final_vector_assembler.bin".format(base_path)
final_assembler = VectorAssembler.load(final_assembler_path)
In [17]:
# Bucketize the departure and arrival delays for classification
ml_bucketized_features = arrival_bucketizer.transform(features)
# Check the buckets
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(12)
In [18]:
# Vectorize string fields with the corresponding pipeline for that column
# Turn category fields into categoric feature vectors, then drop intermediate fields
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
"Origin", "Dest", "FlightNum"]:
string_pipeline_path = "{}/models/string_indexer_pipeline_{}.bin".format(
base_path,
column
)
string_pipeline_model = string_vectorizer_pipeline_models[column]
ml_bucketized_features = string_pipeline_model.transform(ml_bucketized_features)
ml_bucketized_features = ml_bucketized_features.drop(column + "_index")
# Vectorize numeric columns: DepDelay and Distance
ml_bucketized_features = vector_assembler.transform(ml_bucketized_features)
# Combine various features into one feature vector, 'features'
final_vectorized_features = final_assembler.transform(ml_bucketized_features)
Cleanup by dropping the individual vectors and check the data out...
In [19]:
# Drop the individual vector columns
feature_columns = ["Carrier_vec", "DayOfMonth_vec", "DayOfWeek_vec", "DayOfYear_vec",
"Origin_vec", "Dest_vec", "FlightNum_vec", "NumericFeatures_vec"]
for column in feature_columns:
final_vectorized_features = final_vectorized_features.drop(column)
final_vectorized_features.show(5)
In [20]:
# Inspect the finalized features - try first with just 100K items
limited_final_vectorized_features = final_vectorized_features.limit(100000)
limited_final_vectorized_features.show(5)
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(featuresCol="Features_vec", labelCol="ArrDelayBucket", predictionCol="Prediction")
model = rfc.fit(limited_final_vectorized_features)
print("Model trained on 100K items successfully!")
In [21]:
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(featuresCol="Features_vec", labelCol="ArrDelayBucket", predictionCol="Prediction")
#model = rfc.fit(final_vectorized_features)
model = rfc.fit(limited_final_vectorized_features)
# Save the new model over the old one
model_output_path = "{}/models/spark_random_forest_classifier.flight_delays.2.0.bin".format(
base_path
)
model.write().overwrite().save(model_output_path)
print("Model trained on 5.7MM items successfully!")
In [22]:
# Make the predictions...
predictions = model.transform(final_vectorized_features)
predictions.groupBy("Prediction").count().show()
Shit! Ok, the bucketing wasn't the issue. DAMN. Well, this is a learning experience. We'll have to figure out something else.
In [23]:
schema = StructType([
StructField("ArrDelay", DoubleType(), True), # "ArrDelay":5.0
StructField("CRSArrTime", TimestampType(), True), # "CRSArrTime":"2015-12-31T03:20:00.000-08:00"
StructField("CRSDepTime", TimestampType(), True), # "CRSDepTime":"2015-12-31T03:05:00.000-08:00"
StructField("Carrier", StringType(), True), # "Carrier":"WN"
StructField("DayOfMonth", IntegerType(), True), # "DayOfMonth":31
StructField("DayOfWeek", IntegerType(), True), # "DayOfWeek":4
StructField("DayOfYear", IntegerType(), True), # "DayOfYear":365
StructField("DepDelay", DoubleType(), True), # "DepDelay":14.0
StructField("Dest", StringType(), True), # "Dest":"SAN"
StructField("Distance", DoubleType(), True), # "Distance":368.0
StructField("FlightDate", DateType(), True), # "FlightDate":"2015-12-30T16:00:00.000-08:00"
StructField("FlightNum", StringType(), True), # "FlightNum":"6109"
StructField("Origin", StringType(), True), # "Origin":"TUS"
])
input_path = "{}/data/simple_flight_delay_features.json".format(
base_path
)
features = spark.read.json(input_path, schema=schema)
features.first()
Out[23]:
In [24]:
# Bucketize the departure and arrival delays for classification
ml_bucketized_features = arrival_bucketizer.transform(features)
In [25]:
# Vectorize numeric columns: DepDelay and Distance
numeric_features = vector_assembler.transform(ml_bucketized_features)
In [26]:
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(featuresCol="NumericFeatures_vec", labelCol="ArrDelayBucket", predictionCol="Prediction")
model = rfc.fit(numeric_features)
# Save the new model over the old one
model_output_path = "{}/models/spark_random_forest_classifier.flight_delays.3.0.bin".format(
base_path
)
model.write().overwrite().save(model_output_path)
print("Model trained on 5.7MM items successfully!")
In [27]:
# Make the predictions...
predictions = model.transform(numeric_features)
predictions.show()
In [28]:
predictions.registerTempTable("predictions")
spark.sql("""SELECT Prediction, COUNT(*) FROM predictions GROUP BY Prediction""").show()
In [29]:
spark.sql("""Select ArrDelayBucket, COUNT(*) FROM predictions GROUP BY ArrDelayBucket""").show()
Ok, this looks respectable, or at least... not completely erroneous! Now lets add in another feature and see what happens.
Now, starting with just Origin
and Dest
, we'll try adding back the string columns and see what happens.
Start by creating a vector for each field, Origin
and Dest
, and check them out. Note that we repeat the numeric feature vectorization to refresh the numeric_features
relation, which we modify by looping.
In [30]:
# Vectorize numeric columns: DepDelay and Distance
numeric_features = vector_assembler.transform(ml_bucketized_features)
# Vectorize string fields with the corresponding pipeline for that column
# Turn category fields into categoric feature vectors, then drop intermediate fields
string_columns = ["Origin", "Dest"]
for column in string_columns:
string_pipeline_path = "{}/models/string_indexer_pipeline_{}.bin".format(
base_path,
column
)
string_pipeline_model = string_vectorizer_pipeline_models[column]
numeric_features = string_pipeline_model.transform(numeric_features)
numeric_features = numeric_features.drop(column + "_index")
numeric_features.show(5)
In [31]:
vector_columns = ["Origin_vec", "Dest_vec", "NumericFeatures_vec"]
final_assembler = VectorAssembler(
inputCols=vector_columns,
outputCol="Features_vec"
)
final_vectorized_features = final_assembler.transform(numeric_features)
for column in vector_columns:
final_vectorized_features = final_vectorized_features.drop(column)
final_vectorized_features.show(5)
In [32]:
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(featuresCol="Features_vec", labelCol="ArrDelayBucket", predictionCol="Prediction")
model = rfc.fit(final_vectorized_features)
# Save the new model over the old one
model_output_path = "{}/models/spark_random_forest_classifier.flight_delays.4.0.bin".format(
base_path
)
model.write().overwrite().save(model_output_path)
print("Model trained on 5.7MM items successfully!")
In [33]:
# Make the predictions...
predictions = model.transform(final_vectorized_features)
predictions.show()
Uh oh... all 1.0s...
In [34]:
predictions.registerTempTable("predictions")
spark.sql("""SELECT Prediction, COUNT(*) FROM predictions GROUP BY Prediction""").show()
In [35]:
feature_importances = model.featureImportances
feature_importances
Out[35]:
In [36]:
# I dunno
In [37]:
# Bucketize the departure and arrival delays for classification
ml_bucketized_features = arrival_bucketizer.transform(features)
from pyspark.sql.functions import lit, concat
features_with_route = ml_bucketized_features.withColumn(
'Route',
concat(
ml_bucketized_features.Origin,
lit('-'),
ml_bucketized_features.Dest
)
)
features_with_route.show(6)
In [38]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorIndexer
route_string_indexer = StringIndexer(
inputCol='Route',
outputCol='Route' + "_index"
)
route_one_hot_encoder = OneHotEncoder(
dropLast=False,
inputCol='Route' + "_index",
outputCol='Route' + "_vec"
)
route_string_pipeline = Pipeline(
stages=[route_string_indexer, route_one_hot_encoder]
)
# Fit/transform to apply to the data
route_string_pipeline_model = route_string_pipeline.fit(features_with_route)
features_with_route_vec = route_string_pipeline_model.transform(features_with_route)
# Drop the intermediate index
features_with_route_vec = features_with_route_vec.drop("Route" + "_index")
# Save the pipeline model
route_string_pipeline_output_path = "{}/models/string_indexer_pipeline_model_{}.bin".format(
base_path,
"Route"
)
route_string_pipeline_model.write().overwrite().save(route_string_pipeline_output_path)
print("Saved route string pipeline model to {}...".format(route_string_pipeline_output_path))
In [39]:
# Show a sample of the output
features_with_route_vec.sample(False, 0.001, 11).show(5)
In [40]:
# Load the numeric vector assembler
from pyspark.ml.feature import VectorAssembler
vector_assembler_path = "{}/models/numeric_vector_assembler.bin".format(base_path)
vector_assembler = VectorAssembler.load(vector_assembler_path)
# Vectorize numeric columns: DepDelay and Distance
features_with_route_and_numeric = vector_assembler.transform(features_with_route_vec)
features_with_route_and_numeric.show(5)
In [43]:
# Create and save the VectorAssembler
route_feature_columns = ["Route_vec", "NumericFeatures_vec"]
route_final_assembler = VectorAssembler(
inputCols=route_feature_columns,
outputCol="Features_vec"
)
route_final_assembler_path = "{}/models/final_vector_assembler_simplified_with_route.bin".format(base_path)
route_final_assembler.write().overwrite().save(route_final_assembler_path)
print("Created and stored VectorAssembler...")
Apply the VectorAssembler
via transform
and check it out.
In [44]:
final_vectorized_features_with_route = route_final_assembler.transform(features_with_route_and_numeric)
final_vectorized_features_with_route.sample(False, 0.001, 99).show(5)
And drop the intermediate feature vector columns.
In [45]:
for column in route_feature_columns:
final_vectorized_features_with_route = final_vectorized_features_with_route.drop(column)
final_vectorized_features_with_route.sample(False, 0.001, 12).show()
In [47]:
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
rfc = RandomForestClassifier(featuresCol="Features_vec", labelCol="ArrDelayBucket", predictionCol="Prediction")
model = rfc.fit(final_vectorized_features)
# Save the new model over the old one
model_output_path = "{}/models/spark_random_forest_classifier.flight_delays_simplified_with_route.bin".format(
base_path
)
model.write().overwrite().save(model_output_path)
print("Model trained on 5.7MM items successfully!")
In [49]:
predictions = model.transform(final_vectorized_features)
predictions.groupBy("Prediction").count().show()
In [52]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorIndexer
route_string_indexer = StringIndexer(
inputCol='Route',
outputCol='Route' + "_index"
)
route_string_indexer_model = route_string_indexer.fit(features_with_route)
# Fit/transform to apply to the data
features_with_route_index = route_string_indexer_model.transform(features_with_route)
features_with_route_index.show(5)
Now compose the Route_index
with the DepDelay
and Distance
columns
In [53]:
# Handle continuous, numeric fields by combining them into one feature vector
new_numeric_columns = ["DepDelay", "Distance", "Route_index"]
route_index_vector_assembler = VectorAssembler(
inputCols=new_numeric_columns,
outputCol="Features_vec"
)
final_route_index_features = route_index_vector_assembler.transform(features_with_route_index)
# Save the numeric vector assembler
route_index_vector_assembler_path = "{}/models/numeric_vector_assembler_route_index.bin".format(base_path)
route_index_vector_assembler.write().overwrite().save(route_index_vector_assembler_path)
In [55]:
final_route_index_features.sample(False,0.001,88).show(5)
Train a new model on this new set of features.
In [61]:
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
rfc = RandomForestClassifier(featuresCol="Features_vec", labelCol="ArrDelayBucket", predictionCol="Prediction", maxBins=4657)
model = rfc.fit(final_route_index_features)
# Save the new model over the old one
model_output_path = "{}/models/spark_random_forest_classifier.flight_delays_simplified_with_route_index.bin".format(
base_path
)
model.write().overwrite().save(model_output_path)
print("Model trained on 5.7MM items successfully!")
And make predictions on the training data and inspect their distribution.
In [64]:
predictions = model.transform(final_route_index_features)
predictions.groupBy("Prediction").count().show()
It works!
In [65]:
predictions.groupBy("ArrDelayBucket").count().show()
In [ ]: