Now that we have deployed working models predicting flight delays, it is time to “make believe” that our prediction has proven useful based on user feedback, and further that the prediction is valuable enough that prediction quality is important. In this case, it is time to iteratively improve the quality of our prediction. If a prediction is valuable enough, this becomes a full-time job for one or more people.
In this chapter we will tune our Spark ML classifier and also do additional feature engineering to improve prediction quality. In doing so, we will show you how to iteratively improve predictions.
At this point we realized that our model was always predicting one class, no matter the input. We began by investigating that in a Jupyter notebook at ch09/Debugging Prediction Problems.ipynb.
The notebook itself is very long, and we tried many things to fix our model. It turned out we had made a mistake. We were using OneHotEncoder
on top of the output of StringIndexerModel
when we were encoding our nominal/categorical string features. This is how you should encode features for models other than decision trees, but it turns out that for decision tree models, you are supposed to take the string indexes from StringIndexerModel
and directly compose them with your continuous/numeric features in a VectorAssembler
. Decision trees are able to infer the fact that indexes represent categories. One benefit of directly adding StringIndexes to your feature vectors is that you then get easily interpretable feature importances.
When we discovered this, we had to go back and edit the book so that we didn’t teach something that was wrong, and so this is now what you see. We thought it worthwhile to link to the notebook, though, to show how this really works in the wild: you build broken shit and then fix it.
Not all predictions should be improved. Often something fast and crude will work well enough as an MVP (minimum viable product). Only predictions that prove useful should be improved. It is possible to sink large volumes of time into improving the quality of a prediction, so it is essential that you connect with users before getting sucked into this task. This is why we’ve included the discussion of improving predictions in its own chapter.
There are a few ways to improve an existing predictive model. The first is by tuning the parameters of the statistical model making your prediction. The second is feature engineering.
Tuning model hyperparameters to improve predictive model quality can be done by intuition, or by brute force through something called a grid or random search. We’re going to focus on feature engineering, as hyperparameter tuning is covered elsewhere. A good guide to hyperparameter tuning is available in the Spark documentation on model selection and tuning.
As we move through this chapter, we’ll be using the work we’ve done so far to perform feature engineering. Feature engineering is the most important part of making good predictions. It involves using what you’ve discovered about the data through exploratory data analysis in order to feed your machine learning algorithm better, more consequential data as input.
There are several ways to decide which features to use, and Saurav Kaushik has written a post on Analytics Vidhya that introduces them well. The method we employ primarily, which we jokingly entitle the Experimental Adhesion Method, is to quickly select all the features that we can simply compute, and try them all using a random forest or gradient boosted decision tree model (note that even if our application requires another type of model, we still use decision trees to guide feature selection). Then we train the model and inspect the model’s feature importances to “see what sticks.” The most important variables are retained, and this forms the basic model we begin with.
Feature engineering is an iterative process. Based on the feature importances, we ponder what new things we might try using the data we have available. We start with the simplest idea, or the one that is easiest to implement. If the feature importances indicate one type of feature is important, and we can’t easily compute new features similar to this one, we think about how we might acquire new data to join to our training data to use as features.
The key is to be logical and systematic in our exploration of the feature space. You should think about how easy a potential feature is to compute, as well as what it would teach you if it turned out to be important. Are there other, similar features that you could try if this candidate worked? Develop hypotheses and test them in the form of new features. Evaluate each new feature in an experiment and reflect on what you’ve learned before engineering the next feature.
In order to improve our classification model, we need to reliably determine its prediction quality in the first place. To do so, we need to beef up our cross-validation code, and then establish a baseline of quality for the original model. Check out ch09/baseline_spark_mllib_model.py, which we copied from ch09/train_spark_mllib_model.py and altered to improve its cross-validation code.
In order to evaluate the prediction quality of our classifier, we need to use more than one metric. Spark ML’s MulticlassClassificationEvaluator
offers four metrics: accuracy, weighted precision, weighted recall, and f1.
The raw accuracy is just what it sounds like: the number of correct predictions divided by the number of predictions. This is something to check first, but it isn’t adequate alone. Precision is a measure of how useful the result is. Recall describes how complete the results are. The f1 score incorporates both precision and recall to determine overall quality. Taken together, the changes to these metrics between consecutive runs of training our model can give us a clear picture of what is happening with our model in terms of prediction quality. We will use these metrics along with feature importance to guide our feature engineering efforts.
Model quality metrics aren’t enough to guide the iterative improvements of our model. To understand what is going on with each new run, we need to employ a type of model called a decision tree.
In Spark ML, the best general-purpose multiclass classification model is an implementation of a random forest, the RandomForestClassificationModel, fit by the RandomForestClassifier. Random forests can classify or regress, and they have an important feature that helps us interrogate predictive models through a feature called feature importance.
The importance of a feature is what it sounds like: a measure of how important that feature was in contributing to the accuracy of the model. This information is incredibly useful, as it can serve as a guiding hand to feature engineering. In other words, if you know how important a feature is, you can use this clue to make changes that increase the accuracy of the model, such as removing unimportant features and trying to engineer features similar to those that are most important. Feature engineering is a major theme of Agile Data Science, and it is a big part of why we’ve been doing iterative visualization and exploration (the purpose of which is to shed light on and drive feature engineering).
Note that the state of the art for many classification and regression tasks is a gradient boosted decision tree, but as of version 2.1.0 Spark ML’s implementation—the GBTClassificationModel
, which is fit by the GBTClassifier
—can only do binary classification.
We need to run through the model's code from chapter 8 before we can setup and run an experiment.
In [1]:
import sys, os, re
import json
import datetime, iso8601
from tabulate import tabulate
# Initialize PySpark
APP_NAME = "Improving Predictions"
# If there is no SparkSession, create the environment
try:
sc and spark
except NameError as e:
import findspark
findspark.init()
import pyspark
import pyspark.sql
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()
print("PySpark initialized...")
In [2]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf
schema = StructType([
StructField("ArrDelay", DoubleType(), True), # "ArrDelay":5.0
StructField("CRSArrTime", TimestampType(), True), # "CRSArrTime":"2015-12-31T03:20:00.000-08:00"
StructField("CRSDepTime", TimestampType(), True), # "CRSDepTime":"2015-12-31T03:05:00.000-08:00"
StructField("Carrier", StringType(), True), # "Carrier":"WN"
StructField("DayOfMonth", IntegerType(), True), # "DayOfMonth":31
StructField("DayOfWeek", IntegerType(), True), # "DayOfWeek":4
StructField("DayOfYear", IntegerType(), True), # "DayOfYear":365
StructField("DepDelay", DoubleType(), True), # "DepDelay":14.0
StructField("Dest", StringType(), True), # "Dest":"SAN"
StructField("Distance", DoubleType(), True), # "Distance":368.0
StructField("FlightDate", DateType(), True), # "FlightDate":"2015-12-30T16:00:00.000-08:00"
StructField("FlightNum", StringType(), True), # "FlightNum":"6109"
StructField("Origin", StringType(), True), # "Origin":"TUS"
])
input_path = "../data/simple_flight_delay_features.jsonl"
features = spark.read.json(input_path, schema=schema)
# Sample 10% to make executable inside the notebook
features = features.sample(False, 0.1)
features.show(3)
features.first()
Out[2]:
In [3]:
#
# Check for nulls in features before using Spark ML
#
# null_counts = [(column, features.where(features[column].isNull()).count()) for column in features.columns]
# cols_with_nulls = filter(lambda x: x[1] > 0, null_counts)
# print("Columns with nulls that need to be filtered: {}".format(
# str(list(cols_with_nulls))
# ))
In [4]:
#
# Add a Route variable to replace FlightNum
#
from pyspark.sql.functions import lit, concat
features_with_route = features.withColumn(
'Route',
concat(
features.Origin,
lit('-'),
features.Dest
)
)
features_with_route.show(3)
In [5]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay into on-time, slightly late, very late (0, 1, 2)
#
from pyspark.ml.feature import Bucketizer
# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
splits=splits,
inputCol="ArrDelay",
outputCol="ArrDelayBucket"
)
# Save the model
arrival_bucketizer_path = "../models/arrival_bucketizer_2.0.bin"
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)
# Apply the model
ml_bucketized_features = arrival_bucketizer.transform(features_with_route)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(5)
In [6]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler
# Turn category fields into indexes
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
"Origin", "Dest", "Route"]:
print("Indexing column \"{}\" ...".format(column))
string_indexer = StringIndexer(
inputCol=column,
outputCol=column + "_index"
)
string_indexer_model = string_indexer.fit(ml_bucketized_features)
ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)
# Drop the original column
ml_bucketized_features = ml_bucketized_features.drop(column)
# Save the pipeline model
string_indexer_output_path = "../models/string_indexer_model_{}.bin".format(
column
)
string_indexer_model.write().overwrite().save(string_indexer_output_path)
print("Indexed all string columns!")
In [7]:
# Handle continuous, numeric fields by combining them into one feature vector
numeric_columns = ["DepDelay", "Distance"]
index_columns = ["Carrier_index", "DayOfMonth_index",
"DayOfWeek_index", "DayOfYear_index", "Origin_index",
"Origin_index", "Dest_index", "Route_index"]
vector_assembler = VectorAssembler(
inputCols=numeric_columns + index_columns,
outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)
# Save the numeric vector assembler
vector_assembler_path = "../models/numeric_vector_assembler.bin"
vector_assembler.write().overwrite().save(vector_assembler_path)
# Drop the index columns
for column in index_columns:
final_vectorized_features = final_vectorized_features.drop(column)
# Inspect the finalized features
final_vectorized_features.show(5)
In order to be confident in our experiment for each measure, we need to repeat it at least twice to see how it varies. This is the degree to which we cross-validate. In addition, we need to loop and run the measurement code once for each score. Once we’ve collected several scores for each metric, we look at both the average and standard deviation for each score. Taken together, these scores give us a picture of the quality of our classifier.
To begin, we need to iterate and repeat our experiment N times. For each experiment we need to compute a test/train split, then we need to train the model on the training data and apply it to the test data. Then we use MulticlassClassificationEvaluator
to get a score, once for each metric. We gather the scores in a list for each metric, which we will evaluate at the end of the experiment:
In [8]:
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#
from collections import defaultdict
scores = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3
for i in range(1, split_count + 1):
print("Run {} out of {} of test/train splits in cross validation...".format(
i,
split_count,
)
)
# Test/train split
training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(
featuresCol="Features_vec",
labelCol="ArrDelayBucket",
predictionCol="Prediction",
maxBins=4657,
maxMemoryInMB=1024
)
model = rfc.fit(training_data)
# Save the new model over the old one
model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
model.write().overwrite().save(model_output_path)
# Evaluate model using test data
predictions = model.transform(test_data)
# Evaluate this split's results for each metric
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
for metric_name in metric_names:
evaluator = MulticlassClassificationEvaluator(
labelCol="ArrDelayBucket",
predictionCol="Prediction",
metricName=metric_name
)
score = evaluator.evaluate(predictions)
scores[metric_name].append(score)
print("{} = {}".format(metric_name, score))
Our run leaves us with a defaultdict
of scores, with one list for each metric. Now we need to compute the average and standard deviation of each list to give us the overall average and standard deviation of each metric:
Note that we need to compute both the average and standard deviation of each metric from our run. The average will tell us the approximate performance level, and the standard deviation will tell us how much a model's performance varies. Less variance is desirable. We'll use this information in tuning our model.
In [9]:
#
# Evaluate average and STD of each metric and print a table
#
import numpy as np
score_averages = defaultdict(float)
# Compute the table data
average_stds = [] # ha
for metric_name in metric_names:
metric_scores = scores[metric_name]
average_accuracy = sum(metric_scores) / len(metric_scores)
score_averages[metric_name] = average_accuracy
std_accuracy = np.std(metric_scores)
average_stds.append((metric_name, average_accuracy, std_accuracy))
# Print the table
print("\nExperiment Log")
print("--------------")
print(tabulate(average_stds, headers=["Metric", "Average", "STD"]))
The standard deviations indicate that we might not even need to perform k-fold cross-validation, but an inspection of the underlying scores says otherwise:
In [10]:
scores
Out[10]:
There is actually significant variation between runs, and this could obscure a small improvement (or degradation) in prediction quality.
The iterations take time, and this discourages experimentation. A middle ground should be found.
Now that we have our baseline metrics, we can repeat this code as we improve the model and see what the effect is in terms of the four metrics available to us. So it seems we are done, that we can start playing with our model and features. However, we will quickly run into a problem. We will lose track of the score from the previous run, printed on the screen above many logs for each run, unless we write it down each time. And this is tedious. So, we need to automate this process.
What we need to do is load a score log from disk, evaluate the current score in terms of the previous one, and store a new entry to the log back to disk for the next run to access. The following code achieves this aim.
First we use pickle to load any existing score log. If this is not present, we initialize a new log, which is simply an empty Python list. Next we prepare the new log entry—a simple Python dict containing the average score for each of four metrics. Then we subtract the previous run’s score to determine the change in this run. This is the information we use to evaluate whether our change worked or not (along with any changes in feature importances, which we will address as well).
Finally, we append the new score entry to the log and store it back to disk:
In [11]:
#
# Persist the score to a sccore log that exists between runs
#
import pickle
# Load the score log or initialize an empty one
try:
score_log_filename = "../models/score_log.pickle"
score_log = pickle.load(open(score_log_filename, "rb"))
if not isinstance(score_log, list):
score_log = []
except IOError:
score_log = []
# Compute the existing score log entry
score_log_entry = {metric_name: score_averages[metric_name] for metric_name in metric_names}
# Compute and display the change in score for each metric
try:
last_log = score_log[-1]
except (IndexError, TypeError, AttributeError):
last_log = score_log_entry
experiment_report = []
for metric_name in metric_names:
run_delta = score_log_entry[metric_name] - last_log[metric_name]
experiment_report.append((metric_name, run_delta))
print("\nExperiment Report")
print("-----------------")
print(tabulate(experiment_report, headers=["Metric", "Score"]))
# Append the existing average scores to the log
score_log.append(score_log_entry)
# Persist the log for next run
pickle.dump(score_log, open(score_log_filename, "wb"))
Now when we run our script, we will get a report that shows the change between this run and the last run. We can use this, along with our feature importances, to direct our efforts at improving the model. For instance, an example test run shows the model accuracy increase by .003:
Experiment Report
-----------------
Metric Score
----------------- -----------
accuracy 0.00300548
weightedPrecision -0.00592227
weightedRecall 0.00300548
f1 -0.0105553
Jump back to the code for the model, the code under the section Implementing a More Rigorous Experiment
. Re-run all the code between there and here, the last three code blocks. See how the score changed slightly? You will use these changes to guide you as you change the model!
We can use the list of columns given to our final VectorAssembler
along with RandomForestClassificationModel.featureImportances
to derive the importance of each named feature. This is extremely valuable, because like with our prediction quality scores, we can look at changes in feature importances for all features between runs. If a newly introduced feature turns out to be important, it is usually worth adding to the model, so long as it doesn’t hurt quality.
We begin by altering our experiment loop to record feature importances for each run. Check out the abbreviated content from ch09/improved_spark_mllib_model.py:
In [12]:
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#
from collections import defaultdict
scores = defaultdict(list)
feature_importances = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3
for i in range(1, split_count + 1):
print(f"\nRun {i} out of {split_count} of test/train splits in cross validation...")
# Test/train split
training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(
featuresCol="Features_vec",
labelCol="ArrDelayBucket",
predictionCol="Prediction",
maxBins=4657,
)
model = rfc.fit(training_data)
# Save the new model over the old one
model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
model.write().overwrite().save(model_output_path)
# Evaluate model using test data
predictions = model.transform(test_data)
# Evaluate this split's results for each metric
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
for metric_name in metric_names:
evaluator = MulticlassClassificationEvaluator(
labelCol="ArrDelayBucket",
predictionCol="Prediction",
metricName=metric_name
)
score = evaluator.evaluate(predictions)
scores[metric_name].append(score)
print(f"{metric_name} = {score}")
#
# Collect feature importances
#
feature_names = vector_assembler.getInputCols()
feature_importance_list = model.featureImportances
for feature_name, feature_importance in zip(feature_names, feature_importance_list):
feature_importances[feature_name].append(feature_importance)
Next, we need to compute the average of the importance for each feature. Note that we use a defaultdict(float)
to ensure that accessing empty keys returns zero. This will be important when comparing entries in the log with different sets of features. In order to print the feature importances, we need to sort them first, by descending order of importance:
In [13]:
#
# Analyze and report feature importance changes
#
# Compute averages for each feature
feature_importance_entry = defaultdict(float)
for feature_name, value_list in feature_importances.items():
average_importance = sum(value_list) / len(value_list)
feature_importance_entry[feature_name] = average_importance
# Sort the feature importances in descending order and print
import operator
sorted_feature_importances = sorted(
feature_importance_entry.items(),
key=operator.itemgetter(1),
reverse=True
)
print("\nFeature Importances")
print("-------------------")
print(tabulate(sorted_feature_importances, headers=['Name', 'Importance']))
Next we need to perform the same housekeeping as we did for the model score log: load the model, create an entry for this experiment, load the last experiment and compute the change for each feature between that experiment and the current one, and then print a report on these deltas.
First we load the last feature log. If it isn’t available because it doesn’t exist, we initialize the last_feature_log with zeros for each feature, so that new features will have a positive score equal to their amount:
In [14]:
#
# Compare this run's feature importances with the previous run's
#
# Load the feature importance log or initialize an empty one
try:
feature_log_filename = "../models/feature_log.pickle"
feature_log = pickle.load(open(feature_log_filename, "rb"))
if not isinstance(feature_log, list):
feature_log = []
except IOError:
feature_log = []
# Compute and display the change in score for each feature
try:
last_feature_log = feature_log[-1]
except (IndexError, TypeError, AttributeError):
last_feature_log = defaultdict(float)
for feature_name, importance in feature_importance_entry.items():
last_feature_log[feature_name] = importance
Next we compute the change between the last run and the current one:
In [15]:
# Compute the deltas
feature_deltas = {}
for feature_name in feature_importances.keys():
run_delta = feature_importance_entry[feature_name] - last_feature_log[feature_name]
feature_deltas[feature_name] = run_delta
In order to display them, we need to sort the feature importance changes in descending order, to show the biggest change first:
In [16]:
# Sort feature deltas, biggest change first
import operator
sorted_feature_deltas = sorted(
feature_deltas.items(),
key=operator.itemgetter(1),
reverse=True
)
Then we display the sorted feature deltas:
In [17]:
# Display sorted feature deltas
print("\nFeature Importance Delta Report")
print("-------------------------------")
print(tabulate(sorted_feature_deltas, headers=["Feature", "Delta"]))
Finally, as with the score log, we append our entry to the log and save it for the next run:
In [18]:
# Append the existing average deltas to the log
feature_log.append(feature_importance_entry)
# Persist the log for next run
pickle.dump(feature_log, open(feature_log_filename, "wb"))
We’ll use the raw feature importances as well as the changes in feature importance to guide our creation or alteration of features as we improve the model.
Now that we have the ability to understand the effect of changes between experimental runs, we can detect changes that improve our model. We can start adding features to test their effect on the model’s prediction quality, and pursue related features that help improve quality! Without this setup, we would be hard put to make positive changes. With it, we are only bounded by our creativity in our efforts to improve the model.
In examining our feature importances, it looks like the date/time fields have some impact. What if we extracted the hour/minute as an integer from the datetime for departure/arrival fields? This would inform the model about morning versus afternoon versus red-eye flights, which surely affects on-time performance, as there is more traffic in the morning than overnight.
Check out ch09/explore_delays.py. Let’s start by exploring the premise of this feature, that lateness varies by the time of day of the flight:
In [19]:
features.registerTempTable("features")
features.show(5)
In [20]:
spark.sql("""
SELECT
HOUR(CRSDepTime) + 1 AS Hour,
AVG(ArrDelay),
STD(ArrDelay)
FROM features
GROUP BY HOUR(CRSDepTime)
ORDER BY HOUR(CRSDepTime)
""").show(24)
In [21]:
spark.sql("""
SELECT
HOUR(CRSArrTime) + 1 AS Hour,
AVG(ArrDelay),
STD(ArrDelay)
FROM features
GROUP BY HOUR(CRSArrTime)
ORDER BY HOUR(CRSArrTime)
""").show(24)
In [22]:
from pyspark.sql.functions import hour
features = features.withColumn('CRSDepHourOfDay', hour(features.CRSDepTime))
features = features.withColumn('CRSArrHourOfDay', hour(features.CRSArrTime))
departure_cov = features.stat.cov('CRSDepHourOfDay', 'ArrDelay')
arrival_cov = features.stat.cov('CRSArrHourOfDay', 'ArrDelay')
print("Departure delay covariance: {:,}".format(departure_cov))
print("Arrival delay covariance: {:,}".format(arrival_cov))
In [23]:
features.select(
"CRSDepTime",
"CRSDepHourOfDay",
"CRSArrTime",
"CRSArrHourOfDay"
).show()
In [24]:
features.show(5)
So we're back at the beginning, and still have to add Route
, bucketize the data, encode our string and numeric fields and then combine them into a single vector. Lets get started!
In [25]:
#
# Add a Route variable to replace FlightNum
#
from pyspark.sql.functions import lit, concat
features_with_route = features.withColumn(
'Route',
concat(
features.Origin,
lit('-'),
features.Dest
)
)
features_with_route.show(6)
In [26]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay into on-time, slightly late, very late (0, 1, 2)
#
from pyspark.ml.feature import Bucketizer
# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
splits=splits,
inputCol="ArrDelay",
outputCol="ArrDelayBucket"
)
# Save the model
arrival_bucketizer_path = "../models/arrival_bucketizer_2.0.bin"
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)
# Apply the model
ml_bucketized_features = arrival_bucketizer.transform(features_with_route)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(5)
In [27]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler
# Turn category fields into indexes
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
"Origin", "Dest", "Route"]:
string_indexer = StringIndexer(
inputCol=column,
outputCol=column + "_index"
)
string_indexer_model = string_indexer.fit(ml_bucketized_features)
ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)
# Save the pipeline model
string_indexer_output_path = "../models/string_indexer_model_3.0.{}.bin".format(
column
)
string_indexer_model.write().overwrite().save(string_indexer_output_path)
ml_bucketized_features.show(5)
In [28]:
# Combine continuous, numeric fields with indexes of nominal ones
# ...into one feature vector
numeric_columns = [
"DepDelay", "Distance",
"CRSDepHourOfDay", "CRSArrHourOfDay"
]
index_columns = ["Carrier_index", "DayOfMonth_index",
"DayOfWeek_index", "DayOfYear_index", "Origin_index",
"Origin_index", "Dest_index", "Route_index"]
vector_assembler = VectorAssembler(
inputCols=numeric_columns + index_columns,
outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)
# Save the numeric vector assembler
vector_assembler_path = "../models/numeric_vector_assembler_3.0.bin"
vector_assembler.write().overwrite().save(vector_assembler_path)
# Drop the index columns
for column in index_columns:
final_vectorized_features = final_vectorized_features.drop(column)
# Inspect the finalized features
final_vectorized_features.show(5)
In [29]:
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#
from collections import defaultdict
scores = defaultdict(list)
feature_importances = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3
for i in range(1, split_count + 1):
print(f"\nRun {i} out of {split_count} of test/train splits in cross validation...")
# Test/train split
training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(
featuresCol="Features_vec",
labelCol="ArrDelayBucket",
predictionCol="Prediction",
maxBins=4657,
maxMemoryInMB=1024
)
model = rfc.fit(training_data)
# Save the new model over the old one
model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
model.write().overwrite().save(model_output_path)
# Evaluate model using test data
predictions = model.transform(test_data)
# Evaluate this split's results for each metric
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
for metric_name in metric_names:
evaluator = MulticlassClassificationEvaluator(
labelCol="ArrDelayBucket",
predictionCol="Prediction",
metricName=metric_name
)
score = evaluator.evaluate(predictions)
scores[metric_name].append(score)
print(f"{metric_name} = {score}")
#
# Collect feature importances
#
feature_names = vector_assembler.getInputCols()
feature_importance_list = model.featureImportances
for feature_name, feature_importance in zip(feature_names, feature_importance_list):
feature_importances[feature_name].append(feature_importance)
In [30]:
#
# Evaluate average and STD of each metric and print a table
#
import numpy as np
score_averages = defaultdict(float)
# Compute the table data
average_stds = [] # ha
for metric_name in metric_names:
metric_scores = scores[metric_name]
average_accuracy = sum(metric_scores) / len(metric_scores)
score_averages[metric_name] = average_accuracy
std_accuracy = np.std(metric_scores)
average_stds.append((metric_name, average_accuracy, std_accuracy))
# Print the table
print("\nExperiment Log")
print("--------------")
print(tabulate(average_stds, headers=["Metric", "Average", "STD"]))
In [31]:
#
# Persist the score to a sccore log that exists between runs
#
import pickle
# Load the score log or initialize an empty one
try:
score_log_filename = "../models/score_log.pickle"
score_log = pickle.load(open(score_log_filename, "rb"))
if not isinstance(score_log, list):
score_log = []
except IOError:
score_log = []
# Compute the existing score log entry
score_log_entry = {metric_name: score_averages[metric_name] for metric_name in metric_names}
# Compute and display the change in score for each metric
try:
last_log = score_log[-1]
except (IndexError, TypeError, AttributeError):
last_log = score_log_entry
experiment_report = []
for metric_name in metric_names:
run_delta = score_log_entry[metric_name] - last_log[metric_name]
experiment_report.append((metric_name, run_delta))
print("\nExperiment Report")
print("-----------------")
print(tabulate(experiment_report, headers=["Metric", "Score"]))
# Append the existing average scores to the log
score_log.append(score_log_entry)
# Persist the log for next run
pickle.dump(score_log, open(score_log_filename, "wb"))
In [32]:
#
# Analyze and report feature importance changes
#
# Compute averages for each feature
feature_importance_entry = defaultdict(float)
for feature_name, value_list in feature_importances.items():
average_importance = sum(value_list) / len(value_list)
feature_importance_entry[feature_name] = average_importance
# Sort the feature importances in descending order and print
import operator
sorted_feature_importances = sorted(
feature_importance_entry.items(),
key=operator.itemgetter(1),
reverse=True
)
print("\nFeature Importances")
print("-------------------")
print(tabulate(sorted_feature_importances, headers=['Name', 'Importance']))
In [33]:
#
# Compare this run's feature importances with the previous run's
#
# Load the feature importance log or initialize an empty one
try:
feature_log_filename = "../models/feature_log.pickle"
feature_log = pickle.load(open(feature_log_filename, "rb"))
if not isinstance(feature_log, list):
feature_log = []
except IOError:
feature_log = []
# Compute and display the change in score for each feature
try:
last_feature_log = feature_log[-1]
except (IndexError, TypeError, AttributeError):
last_feature_log = defaultdict(float)
for feature_name, importance in feature_importance_entry.items():
last_feature_log[feature_name] = importance
# Compute the deltas
feature_deltas = {}
for feature_name in feature_importances.keys():
run_delta = feature_importance_entry[feature_name] - last_feature_log[feature_name]
feature_deltas[feature_name] = run_delta
# Sort feature deltas, biggest change first
import operator
sorted_feature_deltas = sorted(
feature_deltas.items(),
key=operator.itemgetter(1),
reverse=True
)
# Display sorted feature deltas
print("\nFeature Importance Delta Report")
print("-------------------------------")
print(tabulate(sorted_feature_deltas, headers=["Feature", "Delta"]))
# Append the existing average deltas to the log
feature_log.append(feature_importance_entry)
# Persist the log for next run
pickle.dump(feature_log, open(feature_log_filename, "wb"))
Interpreting the output, it looks like the combined effect of these fields is to impact feature importance by about 1%, but the effect on accuracy is insignificant. We’ll leave the fields in, although they don’t help much. Without resorting to advanced time series analysis, it seems we’ve milked all we can from date/time-based features.
Recall from Investigating Airplanes (Entities)
that we incorporated data on airplane manufacturers into our data model. For instance, we analyzed the distribution of manufacturers in the American commercial fleet. In this section, we’re going to join in airline data and see what impact this has on the model’s accuracy.
I wonder whether properties of the aircraft (called the “metal” of the flight) influence delays? For instance, bigger aircraft fly higher and can go over weather, while smaller aircraft may be less able to do so. I can’t honestly think of a reason why the engine manufacturer, airplane manufacturer, or manufacture year would have an impact on the model, but since we’re importing one field, we may as well try them all! Note that we can simply drop any features that don’t rank as very significant. The beauty of our experimental model with decision trees is that it doesn’t cost extra to try extra fields. Sometimes you can simply let the model decide what matters.
Note that when dealing with team members and with other teams who need an accounting of your time in order to coordinate with you, a description of the experiments you are running will help keep the teams in sync. For instance, “We are attempting to incorporate a new dataset which we scraped from the FAA website into our flight delay predictive model” would make a good experimental description during an agile sprint.
To add airplane features to our model, we need to create a new feature extraction script, ch09/extract_features_with_airplanes.py. We can do this by copying and altering ch09/extract_features.py.
First we add TailNum
to the fields we select from our training data. Because this column also appears in our airplane dataset, we need to name it differently or we won’t easily be able to access the column after the join. We’ll name it FeatureTailNum
:
In [34]:
# Load the on-time parquet file
input_path = "../data/january_performance.parquet"
on_time_dataframe = spark.read.parquet(input_path)
on_time_dataframe.registerTempTable("on_time_performance")
# Select a few features of interest
simple_on_time_features = spark.sql("""
SELECT
FlightNum,
FlightDate,
DayOfWeek,
DayofMonth AS DayOfMonth,
CONCAT(Month, '-', DayofMonth) AS DayOfYear,
Carrier,
Origin,
Dest,
Distance,
DepDelay,
ArrDelay,
CRSDepTime,
CRSArrTime,
CONCAT(Origin, '-', Dest) AS Route,
TailNum AS FeatureTailNum
FROM on_time_performance
WHERE FlightDate < '2015-02-01'
""")
simple_on_time_features = simple_on_time_features.sample(False, 0.1)
simple_on_time_features.select(
"FlightNum",
"FlightDate",
"FeatureTailNum"
).show(10)
In [35]:
# Filter nulls, they can't help us
filled_on_time_features = simple_on_time_features.where(simple_on_time_features.ArrDelay.isNotNull())
filled_on_time_features = filled_on_time_features.where(filled_on_time_features.DepDelay.isNotNull())
filled_on_time_features.show(5)
In [36]:
# We need to turn timestamps into timestamps, and not strings or numbers
def convert_hours(hours_minutes):
hours = hours_minutes[:-2]
minutes = hours_minutes[-2:]
if hours == '24':
hours = '23'
minutes = '59'
time_string = "{}:{}:00Z".format(hours, minutes)
return time_string
def compose_datetime(iso_date, time_string):
return "{} {}".format(iso_date, time_string)
def create_iso_string(iso_date, hours_minutes):
time_string = convert_hours(hours_minutes)
full_datetime = compose_datetime(iso_date, time_string)
return full_datetime
def create_datetime(iso_string):
return iso8601.parse_date(iso_string)
def convert_datetime(iso_date, hours_minutes):
iso_string = create_iso_string(iso_date, hours_minutes)
dt = create_datetime(iso_string)
return dt
def day_of_year(iso_date_string):
dt = iso8601.parse_date(iso_date_string)
doy = dt.timetuple().tm_yday
return doy
def alter_feature_datetimes(row):
flight_date = iso8601.parse_date(row['FlightDate'])
scheduled_dep_time = convert_datetime(row['FlightDate'], row['CRSDepTime'])
scheduled_arr_time = convert_datetime(row['FlightDate'], row['CRSArrTime'])
# Handle overnight flights
if scheduled_arr_time < scheduled_dep_time:
scheduled_arr_time += datetime.timedelta(days=1)
doy = day_of_year(row['FlightDate'])
return {
'FlightNum': row['FlightNum'],
'FlightDate': flight_date,
'DayOfWeek': int(row['DayOfWeek']),
'DayOfMonth': int(row['DayOfMonth']),
'DayOfYear': doy,
'Carrier': row['Carrier'],
'Origin': row['Origin'],
'Dest': row['Dest'],
'Distance': row['Distance'],
'DepDelay': row['DepDelay'],
'ArrDelay': row['ArrDelay'],
'CRSDepTime': scheduled_dep_time,
'CRSArrTime': scheduled_arr_time,
'Route': row['Route'],
'FeatureTailNum': row['FeatureTailNum'],
}
In [37]:
timestamp_features = filled_on_time_features.rdd.map(alter_feature_datetimes)
timestamp_df = timestamp_features.toDF()
timestamp_df.show(5)
Next, we load the airplane data and left join it to our features dataset. Note that null is a problematic value for our StringIndexer
. But we don’t want to discard empty values or rows either, because whether a variable is present or not is something our decision tree model can use to learn. We use DataFrame.selectExpr
to COALESCE
our null
values to the string 'Empty'
. This will get its own index from StringIndexer
and things will work out well. Also note that we rename FeatureTailNum
back to TailNum
for the final output:
In [38]:
# Load airplanes and left join on tail numbers
airplanes_path = "../data/airplanes.jsonl"
airplanes = spark.read.json(airplanes_path)
# Left outer join ensures all feature records remain, with nulls where airplane records are not available
features_with_airplanes = timestamp_df.join(
airplanes,
on=timestamp_df.FeatureTailNum == airplanes.TailNum,
how="left_outer"
)
# Fill in the nulls 'Empty' with COALESCE
features_with_airplanes = features_with_airplanes.selectExpr(
"FlightNum",
"FlightDate",
"DayOfWeek",
"DayOfMonth",
"DayOfYear",
"Carrier",
"Origin",
"Dest",
"Distance",
"DepDelay",
"ArrDelay",
"CRSDepTime",
"CRSArrTime",
"Route",
"FeatureTailNum AS TailNum",
"COALESCE(EngineManufacturer, 'Empty') AS EngineManufacturer",
"COALESCE(EngineModel, 'Empty') AS EngineModel",
"COALESCE(Manufacturer, 'Empty') AS Manufacturer",
"COALESCE(ManufacturerYear, 'Empty') AS ManufacturerYear",
"COALESCE(Model, 'Empty') AS Model",
"COALESCE(OwnerState, 'Empty') AS OwnerState"
)
features_with_airplanes.show(5)
In [39]:
# Explicitly sort the data and keep it sorted throughout. Leave nothing to chance.
sorted_features = features_with_airplanes.sort(
timestamp_df.DayOfYear,
timestamp_df.Carrier,
timestamp_df.Origin,
timestamp_df.Dest,
timestamp_df.FlightNum,
timestamp_df.CRSDepTime,
timestamp_df.CRSArrTime,
)
In [40]:
# Store as a single json file
output_path = "../data/simple_flight_delay_features_airplanes.json"
sorted_features.repartition(1).write.mode("overwrite").json(output_path)
Now we’re ready to incorporate the features into our model.
Now we need to create a new script that incorporates our new airplane features into our classifier model. Check out ch09/spark_model_with_airplanes.py, which we copied from ch09/improved_spark_mllib_model.py and altered.
First we need to load the training data with the additional fields, including Route (which is now calculated in ch09/extract_features_with_airplanes.py):
In [41]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf
schema = StructType([
StructField("ArrDelay", DoubleType(), True),
StructField("CRSArrTime", TimestampType(), True),
StructField("CRSDepTime", TimestampType(), True),
StructField("Carrier", StringType(), True),
StructField("DayOfMonth", IntegerType(), True),
StructField("DayOfWeek", IntegerType(), True),
StructField("DayOfYear", IntegerType(), True),
StructField("DepDelay", DoubleType(), True),
StructField("Dest", StringType(), True),
StructField("Distance", DoubleType(), True),
StructField("FlightDate", DateType(), True),
StructField("FlightNum", StringType(), True),
StructField("Origin", StringType(), True),
StructField("Route", StringType(), True),
StructField("TailNum", StringType(), True),
StructField("EngineManufacturer", StringType(), True),
StructField("EngineModel", StringType(), True),
StructField("Manufacturer", StringType(), True),
StructField("ManufacturerYear", StringType(), True),
StructField("OwnerState", StringType(), True),
])
input_path = "../data/simple_flight_delay_features_airplanes.json"
features = spark.read.json(input_path, schema=schema)
features.show(5)
In [42]:
#
# Add the hour of day of scheduled arrival/departure
#
from pyspark.sql.functions import hour
features_with_hour = features.withColumn(
"CRSDepHourOfDay",
hour(features.CRSDepTime)
)
features_with_hour = features_with_hour.withColumn(
"CRSArrHourOfDay",
hour(features.CRSArrTime)
)
features_with_hour.select("CRSDepTime", "CRSDepHourOfDay", "CRSArrTime", "CRSArrHourOfDay").show(5)
Because we left joined our new features in, we need to know how many of the resulting training records have null
values for their fields. Null values will crash the StringIndexer
for a field, so we’ve explicitly altered our feature extraction code to remove them. There should be no nulls, so we’ll print a table with a warning if they are present:
In [43]:
#
# Check for nulls in features before using Spark ML
#
null_counts = [(column, features_with_hour.where(features_with_hour[column].isNull()).count()) for column in features_with_hour.columns]
cols_with_nulls = filter(lambda x: x[1] > 0, null_counts)
print("\nNull Value Report")
print("-----------------")
print(tabulate(cols_with_nulls, headers=["Column", "Nulls"]))
There should be no nulls
present!
Next we need to bucketize our data as per normal.
In [44]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay into on-time, slightly late, very late (0, 1, 2)
#
from pyspark.ml.feature import Bucketizer
# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
splits=splits,
inputCol="ArrDelay",
outputCol="ArrDelayBucket"
)
# Save the model
arrival_bucketizer_path = "../models/arrival_bucketizer_2.0.bin"
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)
# Apply the model
ml_bucketized_features = arrival_bucketizer.transform(features_with_hour)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(5)
Next we add the hour of day fields as normal, and we bucketize the ArrDelay
field to get the ArrDelayBucket
. Then we need to index all our string columns, including our new airplane features.
Note that we are also making another change. We are moving the DayOfMonth
, DayOfWeek
and DayOfYear
fields, along with CRSDepHourOfDay
and CRSArrHourOfDay
into numeric fields directly to be vectorized. This is because in thinking about it... indexing these numeric fields only adds a layer of abstraction, encoding them into numbers again.
In [45]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler
# Turn category fields into indexes
string_columns = ["Carrier", "Origin", "Dest", "Route",
"TailNum", "EngineManufacturer", "EngineModel",
"Manufacturer", "ManufacturerYear", "OwnerState"]
for column in string_columns:
string_indexer = StringIndexer(
inputCol=column,
outputCol=column + "_index"
)
string_indexer_model = string_indexer.fit(ml_bucketized_features)
ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)
# Save the pipeline model
string_indexer_output_path = "../models/string_indexer_model_4.0.{}.bin".format(
column
)
string_indexer_model.write().overwrite().save(string_indexer_output_path)
ml_bucketized_features.show(5)
Next, we need to create a new VectorAssembler
to combine our features into one feature vector, the column Features_vec
. As before, an index field name is the field name with _index appended. This time around, we use a list comprehension to compute the index columns:
In [46]:
# Combine continuous, numeric fields with indexes of nominal ones
# ...into one feature vector
numeric_columns = [
"DepDelay",
"Distance",
"DayOfYear",
"DayOfMonth",
"DayOfWeek",
"CRSDepHourOfDay",
"CRSArrHourOfDay"
]
index_columns = [column + "_index" for column in string_columns]
index_columns
Out[46]:
In [47]:
vector_assembler = VectorAssembler(
inputCols=numeric_columns + index_columns,
outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)
# Save the numeric vector assembler
vector_assembler_path = "../models/numeric_vector_assembler_5.0.bin"
vector_assembler.write().overwrite().save(vector_assembler_path)
# Drop the index columns
for column in index_columns:
final_vectorized_features = final_vectorized_features.drop(column)
# Inspect the finalized features
final_vectorized_features.show(5)
The rest of the code is identical to ch09/improved_spark_mllib_model.py:
In [48]:
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#
from collections import defaultdict
scores = defaultdict(list)
feature_importances = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3
for i in range(1, split_count + 1):
print(f"\nRun {i} out of {split_count} of test/train splits in cross validation...")
# Test/train split
training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(
featuresCol="Features_vec",
labelCol="ArrDelayBucket",
predictionCol="Prediction",
maxBins=4896,
maxMemoryInMB=1024
)
model = rfc.fit(training_data)
# Save the new model over the old one
model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
model.write().overwrite().save(model_output_path)
# Evaluate model using test data
predictions = model.transform(test_data)
# Evaluate this split's results for each metric
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
for metric_name in metric_names:
evaluator = MulticlassClassificationEvaluator(
labelCol="ArrDelayBucket",
predictionCol="Prediction",
metricName=metric_name
)
score = evaluator.evaluate(predictions)
scores[metric_name].append(score)
print("{} = {}".format(metric_name, score))
#
# Collect feature importances
#
feature_names = vector_assembler.getInputCols()
feature_importance_list = model.featureImportances
for feature_name, feature_importance in zip(feature_names, feature_importance_list):
feature_importances[feature_name].append(feature_importance)
In [49]:
#
# Evaluate average and STD of each metric and print a table
#
import numpy as np
score_averages = defaultdict(float)
# Compute the table data
average_stds = [] # ha
for metric_name in metric_names:
metric_scores = scores[metric_name]
average_accuracy = sum(metric_scores) / len(metric_scores)
score_averages[metric_name] = average_accuracy
std_accuracy = np.std(metric_scores)
average_stds.append((metric_name, average_accuracy, std_accuracy))
# Print the table
print("\nExperiment Log")
print("--------------")
print(tabulate(average_stds, headers=["Metric", "Average", "STD"]))
In [50]:
#
# Persist the score to a sccore log that exists between runs
#
import pickle
# Load the score log or initialize an empty one
try:
score_log_filename = "../models/score_log.pickle"
score_log = pickle.load(open(score_log_filename, "rb"))
if not isinstance(score_log, list):
score_log = []
except IOError:
score_log = []
# Compute the existing score log entry
score_log_entry = {
metric_name: score_averages[metric_name] for metric_name in metric_names
}
# Compute and display the change in score for each metric
try:
last_log = score_log[-1]
except (IndexError, TypeError, AttributeError):
last_log = score_log_entry
experiment_report = []
for metric_name in metric_names:
run_delta = score_log_entry[metric_name] - last_log[metric_name]
experiment_report.append((metric_name, run_delta))
print("\nExperiment Report")
print("-----------------")
print(tabulate(experiment_report, headers=["Metric", "Score"]))
# Append the existing average scores to the log
score_log.append(score_log_entry)
# Persist the log for next run
pickle.dump(score_log, open(score_log_filename, "wb"))
In [51]:
#
# Analyze and report feature importance changes
#
# Compute averages for each feature
feature_importance_entry = defaultdict(float)
for feature_name, value_list in feature_importances.items():
average_importance = sum(value_list) / len(value_list)
feature_importance_entry[feature_name] = average_importance
# Sort the feature importances in descending order and print
import operator
sorted_feature_importances = sorted(
feature_importance_entry.items(),
key=operator.itemgetter(1),
reverse=True
)
print("\nFeature Importances")
print("-------------------")
print(tabulate(sorted_feature_importances, headers=['Name', 'Importance']))
In [52]:
#
# Compare this run's feature importances with the previous run's
#
# Load the feature importance log or initialize an empty one
try:
feature_log_filename = "../models/feature_log.pickle"
feature_log = pickle.load(open(feature_log_filename, "rb"))
if not isinstance(feature_log, list):
feature_log = []
except IOError:
feature_log = []
# Compute and display the change in score for each feature
try:
last_feature_log = feature_log[-1]
except (IndexError, TypeError, AttributeError):
last_feature_log = defaultdict(float)
for feature_name, importance in feature_importance_entry.items():
last_feature_log[feature_name] = importance
# Compute the deltas
feature_deltas = {}
for feature_name in feature_importances.keys():
run_delta = feature_importance_entry[feature_name] - last_feature_log[feature_name]
feature_deltas[feature_name] = run_delta
# Sort feature deltas, biggest change first
import operator
sorted_feature_deltas = sorted(
feature_deltas.items(),
key=operator.itemgetter(1),
reverse=True
)
# Display sorted feature deltas
print("\nFeature Importance Delta Report")
print("-------------------------------")
print(tabulate(sorted_feature_deltas, headers=["Feature", "Delta"]))
# Append the existing average deltas to the log
feature_log.append(feature_importance_entry)
# Persist the log for next run
pickle.dump(feature_log, open(feature_log_filename, "wb"))
Note that on the first go around, our model failed because we needed to increase the maxBins parameter to 4896 to accommodate our new fields. After that, the code runs without incident.
It looks like our efforts were mostly for naught—they actually hurt the quality of the model (although so little that it comes out about even)! The single exception is that adding the TailNum
helps in terms of feature importance by 0.05 (your precise results may vary). Apparently some airplanes are more prone to delay than others, but this isn’t down to the properties of the airplane much... or at least those properties of the airplane are encoded mostly by the identity of the airplane itself.
The tail end of the scores for feature importance look like this (for me):
EngineModel_index 0.00221025
OwnerState_index 0.00181267
ManufacturerYear_index 0.00156983
Manufacturer_index 0.000969526
EngineManufacturer_index 0.000708076
DayOfWeek 0.00032268
It is better to have a simpler model as they tend not to overfit, that is to work well on training data by extracting patterns that only apply to it by chance but not to work well on test data or in the real world. They are also easier to deploy and maintain. If something doesn't help - remove it!
Lets remove EngineModel
, OwnerState
, ManufacturerYear
, Manufacturer
, EngineManufacturer
and DayOfWeek
and run the model again, to see what this does to performance.
In [53]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay into on-time, slightly late, very late (0, 1, 2)
#
from pyspark.ml.feature import Bucketizer
# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
splits=splits,
inputCol="ArrDelay",
outputCol="ArrDelayBucket"
)
# Save the model
arrival_bucketizer_path = "../models/arrival_bucketizer_2.0.bin"
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)
# Apply the model
ml_bucketized_features = arrival_bucketizer.transform(features_with_hour)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(5)
In [65]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler
# Turn category fields into indexes
string_columns = ["Carrier", "Origin", "Dest", "Route", "TailNum"]
for column in string_columns:
string_indexer = StringIndexer(
inputCol=column,
outputCol=column + "_index"
)
string_indexer_model = string_indexer.fit(ml_bucketized_features)
ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)
# Save the pipeline model
string_indexer_output_path = "../models/string_indexer_model_4.0.{}.bin".format(
column
)
string_indexer_model.write().overwrite().save(string_indexer_output_path)
ml_bucketized_features.show(5)
In [55]:
# Combine continuous, numeric fields with indexes of nominal ones
# ...into one feature vector
numeric_columns = [
"DepDelay",
"Distance",
"DayOfYear",
"DayOfMonth",
"CRSDepHourOfDay",
"CRSArrHourOfDay"
]
index_columns = [column + "_index" for column in string_columns]
index_columns
Out[55]:
In [56]:
vector_assembler = VectorAssembler(
inputCols=numeric_columns + index_columns,
outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)
# Save the numeric vector assembler
vector_assembler_path = "../models/numeric_vector_assembler_5.0.bin"
vector_assembler.write().overwrite().save(vector_assembler_path)
# Drop the index columns
for column in index_columns:
final_vectorized_features = final_vectorized_features.drop(column)
# Inspect the finalized features
final_vectorized_features.show(5)
In [57]:
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#
from collections import defaultdict
scores = defaultdict(list)
feature_importances = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3
for i in range(1, split_count + 1):
print(f"\nRun {i} out of {split_count} of test/train splits in cross validation...")
# Test/train split
training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(
featuresCol="Features_vec",
labelCol="ArrDelayBucket",
predictionCol="Prediction",
maxBins=4896,
maxMemoryInMB=1024
)
model = rfc.fit(training_data)
# Save the new model over the old one
model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
model.write().overwrite().save(model_output_path)
# Evaluate model using test data
predictions = model.transform(test_data)
# Evaluate this split's results for each metric
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
for metric_name in metric_names:
evaluator = MulticlassClassificationEvaluator(
labelCol="ArrDelayBucket",
predictionCol="Prediction",
metricName=metric_name
)
score = evaluator.evaluate(predictions)
scores[metric_name].append(score)
print("{} = {}".format(metric_name, score))
#
# Collect feature importances
#
feature_names = vector_assembler.getInputCols()
feature_importance_list = model.featureImportances
for feature_name, feature_importance in zip(feature_names, feature_importance_list):
feature_importances[feature_name].append(feature_importance)
In [58]:
#
# Evaluate average and STD of each metric and print a table
#
import numpy as np
score_averages = defaultdict(float)
# Compute the table data
average_stds = [] # ha
for metric_name in metric_names:
metric_scores = scores[metric_name]
average_accuracy = sum(metric_scores) / len(metric_scores)
score_averages[metric_name] = average_accuracy
std_accuracy = np.std(metric_scores)
average_stds.append((metric_name, average_accuracy, std_accuracy))
# Print the table
print("\nExperiment Log")
print("--------------")
print(tabulate(average_stds, headers=["Metric", "Average", "STD"]))
In [59]:
#
# Persist the score to a sccore log that exists between runs
#
import pickle
# Load the score log or initialize an empty one
try:
score_log_filename = "../models/score_log.pickle"
score_log = pickle.load(open(score_log_filename, "rb"))
if not isinstance(score_log, list):
score_log = []
except IOError:
score_log = []
# Compute the existing score log entry
score_log_entry = {
metric_name: score_averages[metric_name] for metric_name in metric_names
}
# Compute and display the change in score for each metric
try:
last_log = score_log[-1]
except (IndexError, TypeError, AttributeError):
last_log = score_log_entry
experiment_report = []
for metric_name in metric_names:
run_delta = score_log_entry[metric_name] - last_log[metric_name]
experiment_report.append((metric_name, run_delta))
print("\nExperiment Report")
print("-----------------")
print(tabulate(experiment_report, headers=["Metric", "Score"]))
# Append the existing average scores to the log
score_log.append(score_log_entry)
# Persist the log for next run
pickle.dump(score_log, open(score_log_filename, "wb"))
In [60]:
#
# Analyze and report feature importance changes
#
# Compute averages for each feature
feature_importance_entry = defaultdict(float)
for feature_name, value_list in feature_importances.items():
average_importance = sum(value_list) / len(value_list)
feature_importance_entry[feature_name] = average_importance
# Sort the feature importances in descending order and print
import operator
sorted_feature_importances = sorted(
feature_importance_entry.items(),
key=operator.itemgetter(1),
reverse=True
)
print("\nFeature Importances")
print("-------------------")
print(tabulate(sorted_feature_importances, headers=['Name', 'Importance']))
In [61]:
#
# Compare this run's feature importances with the previous run's
#
# Load the feature importance log or initialize an empty one
try:
feature_log_filename = "../models/feature_log.pickle"
feature_log = pickle.load(open(feature_log_filename, "rb"))
if not isinstance(feature_log, list):
feature_log = []
except IOError:
feature_log = []
# Compute and display the change in score for each feature
try:
last_feature_log = feature_log[-1]
except (IndexError, TypeError, AttributeError):
last_feature_log = defaultdict(float)
for feature_name, importance in feature_importance_entry.items():
last_feature_log[feature_name] = importance
# Compute the deltas
feature_deltas = {}
for feature_name in feature_importances.keys():
run_delta = feature_importance_entry[feature_name] - last_feature_log[feature_name]
feature_deltas[feature_name] = run_delta
# Sort feature deltas, biggest change first
import operator
sorted_feature_deltas = sorted(
feature_deltas.items(),
key=operator.itemgetter(1),
reverse=True
)
# Display sorted feature deltas
print("\nFeature Importance Delta Report")
print("-------------------------------")
print(tabulate(sorted_feature_deltas, headers=["Feature", "Delta"]))
# Append the existing average deltas to the log
feature_log.append(feature_importance_entry)
# Persist the log for next run
pickle.dump(feature_log, open(feature_log_filename, "wb"))
This impacts the score in a positive way (for me), but not in a significant way: an improvement of 0.00031884 in accuracy. However, at this point all our features are contributing significantly to the model’s prediction quality, which is where we want to be:
Feature Importances
-------------------
Name Importance
--------------- ------------
DepDelay 0.775767
TailNum_index 0.0541045
Route_index 0.0401366
Origin_index 0.0290746
DayOfMonth 0.0287668
DayOfYear 0.0268546
Distance 0.0165887
Dest_index 0.0126576
CRSDepHourOfDay 0.00680116
Carrier_index 0.00542581
CRSArrHourOfDay 0.00382289
Remember: when it comes to predictive models, simpler is better. If a feature doesn’t sizably influence prediction accuracy, remove it. The model’s quality will increase, it will perform faster in production, and you will have an easier time understanding the impact of additional features on the model. A simpler model will be less susceptible to bias.
One thing we haven’t considered yet is the flight time. We should be able to subtract the takeoff time from the landing time and get the duration of the flight. Since distance is a top-3 feature, and the hour of day matters, it seems like flight time might eke out a bit more prediction quality. Let’s try!
In the book, we computed this field by converting our ISO datetimes into unix times (seconds since 1970) and subtracted the takeoff time from the landing time. This gave us flight time in seconds. However, it turns out there is a field called AirTime
which is the minutes in the air. Lets try adding AirTime
and see how things work.
It is easy to make the mistake of incorporating a feature into your model that you can't reliabily retrieve in realtime to make a prediction. Remember - if you can't get it in realtime when you make a prediction, you can't incorporate it into the model... not if your model is going to work in the real world. This is a big difference between data science in Kaffle competitions and data science in practice.
AirTime
would be available at runtime, if we compute an expected AirTime
by subtracting the scheduled arrival time CRSArrTime
from the schedule departure CRSDepTime
, after converting both to unix time. We would then have to divide by 60 to get the expected AirTime
. Would that work alright? One must reason about features... and in this case it seems like a valid feature, and a similar feature, Distance
has a 1.65% relative feature importance (for me).
Check out ch09/extract_features_with_flight_time.py, which we copied from ch09/extract_features_with_airplanes.py. We only need to change one line, our selectExpr, to add the date math for our FlightTime field:
Lets add AirTime
to our model and see whether it helps or not. We'll have to go back and load the AirTime
column, then proceed with our experiment. We're going to put all the code in one block this time, to give you a code block you can use to add your own features to the model later.
We simply add the AirTime
column to our initial SELECT
statement to Spark SQL. Note that sometimes AirTime
is not present, resulting in null values. To address this, we will use Spark SQL's COALESCE
function to impute (fill in) a 0.0
value in place of null. This way our model can use this missing data as evidence of timeliness, along with the values which are present.
In [62]:
# Load the on-time parquet file
input_path = "../data/january_performance.parquet"
on_time_dataframe = spark.read.parquet(input_path)
on_time_dataframe.registerTempTable("on_time_performance")
# Filtering on FlightDate > 2/1 and a 10% sample are for a training course.
# Feel free to use all the data if you have the RAM and the time!
features = spark.sql("""
SELECT
FlightNum,
FlightDate,
DayOfWeek,
DayofMonth AS DayOfMonth,
CONCAT(Month, '-', DayofMonth) AS DayOfYear,
Carrier,
Origin,
Dest,
Distance,
DepDelay,
ArrDelay,
CRSDepTime,
CRSArrTime,
CONCAT(Origin, '-', Dest) AS Route,
TailNum,
COALESCE(AirTime, 0.0) AS AirTime
FROM on_time_performance
WHERE FlightDate < '2015-02-01'
""")
features = features.sample(False, 0.1)
# Filter nulls, they can't help us
features = features.filter(
(features.ArrDelay.isNotNull())
&
(features.DepDelay.isNotNull())
)
features.show(10)
#
# Add the hour of day of scheduled arrival/departure
#
from pyspark.sql.functions import hour
features_with_hour = features.withColumn(
"CRSDepHourOfDay",
hour(features.CRSDepTime)
)
features_with_hour = features_with_hour.withColumn(
"CRSArrHourOfDay",
hour(features.CRSArrTime)
)
features_with_hour.select("CRSDepTime", "CRSDepHourOfDay", "CRSArrTime", "CRSArrHourOfDay").show(5)
# We need to turn timestamps into timestamps, and not strings or numbers
def convert_hours(hours_minutes):
hours = hours_minutes[:-2]
minutes = hours_minutes[-2:]
if hours == '24':
hours = '23'
minutes = '59'
time_string = "{}:{}:00Z".format(hours, minutes)
return time_string
def compose_datetime(iso_date, time_string):
return "{} {}".format(iso_date, time_string)
def create_iso_string(iso_date, hours_minutes):
time_string = convert_hours(hours_minutes)
full_datetime = compose_datetime(iso_date, time_string)
return full_datetime
def create_datetime(iso_string):
return iso8601.parse_date(iso_string)
def convert_datetime(iso_date, hours_minutes):
iso_string = create_iso_string(iso_date, hours_minutes)
dt = create_datetime(iso_string)
return dt
def day_of_year(iso_date_string):
dt = iso8601.parse_date(iso_date_string)
doy = dt.timetuple().tm_yday
return doy
def alter_feature_datetimes(row):
flight_date = iso8601.parse_date(row['FlightDate'])
scheduled_dep_time = convert_datetime(row['FlightDate'], row['CRSDepTime'])
scheduled_arr_time = convert_datetime(row['FlightDate'], row['CRSArrTime'])
# Handle overnight flights
if scheduled_arr_time < scheduled_dep_time:
scheduled_arr_time += datetime.timedelta(days=1)
doy = day_of_year(row['FlightDate'])
return {
'FlightNum': row['FlightNum'],
'FlightDate': flight_date,
'DayOfWeek': int(row['DayOfWeek']),
'DayOfMonth': int(row['DayOfMonth']),
'DayOfYear': doy,
'Carrier': row['Carrier'],
'Origin': row['Origin'],
'Dest': row['Dest'],
'Distance': row['Distance'],
'DepDelay': row['DepDelay'],
'ArrDelay': row['ArrDelay'],
'CRSDepTime': scheduled_dep_time,
'CRSArrTime': scheduled_arr_time,
'Route': row['Route'],
'TailNum': row['TailNum'],
'AirTime': row['AirTime']
}
timestamp_features = features_with_hour.rdd.map(alter_feature_datetimes)
timestamp_df = timestamp_features.toDF()
# Explicitly sort the data and keep it sorted throughout. Leave nothing to chance.
sorted_features = timestamp_df.sort(
timestamp_df.DayOfYear,
timestamp_df.Carrier,
timestamp_df.Origin,
timestamp_df.Dest,
timestamp_df.FlightNum,
timestamp_df.CRSDepTime,
timestamp_df.CRSArrTime,
)
sorted_features.show(10)
# Store as a single json file
output_path = "../data/simple_flight_delay_features_flight_times.json"
sorted_features.repartition(1).write.mode("overwrite").json(output_path)
print("Features with AirTime prepared!")
Note that we still store our data to disk and then load it explicitly. This ensures its provenance and formatting are exactly as we expect, and have not been inferred... inference that could change later and throw off our model. Model imput must be as accurate and precise as possible!
Here we just add AirTime
to the columns we load, then we add it to the list of numeric_columns
. The rest is the same as before.
In [63]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf
schema = StructType([
StructField("ArrDelay", DoubleType(), True),
StructField("CRSArrTime", TimestampType(), True),
StructField("CRSDepTime", TimestampType(), True),
StructField("Carrier", StringType(), True),
StructField("DayOfMonth", IntegerType(), True),
StructField("DayOfWeek", IntegerType(), True),
StructField("DayOfYear", IntegerType(), True),
StructField("DepDelay", DoubleType(), True),
StructField("Dest", StringType(), True),
StructField("Distance", DoubleType(), True),
StructField("FlightDate", DateType(), True),
StructField("FlightNum", StringType(), True),
StructField("Origin", StringType(), True),
StructField("Route", StringType(), True),
StructField("TailNum", StringType(), True),
StructField("AirTime", FloatType(), True)
])
input_path = "../data/simple_flight_delay_features_flight_times.json"
features = spark.read.json(input_path, schema=schema)
features.show(5)
#
# Add the hour of day of scheduled arrival/departure
#
from pyspark.sql.functions import hour
features_with_hour = features.withColumn(
"CRSDepHourOfDay",
hour(features.CRSDepTime)
)
features_with_hour = features_with_hour.withColumn(
"CRSArrHourOfDay",
hour(features.CRSArrTime)
)
features_with_hour.select("CRSDepTime", "CRSDepHourOfDay", "CRSArrTime", "CRSArrHourOfDay").show(5)
#
# Check for nulls in features before using Spark ML
#
null_counts = [(column, features_with_hour.where(features_with_hour[column].isNull()).count()) for column in features_with_hour.columns]
cols_with_nulls = filter(lambda x: x[1] > 0, null_counts)
print("\nNull Value Report")
print("-----------------")
print(tabulate(cols_with_nulls, headers=["Column", "Nulls"]))
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay into on-time, slightly late, very late (0, 1, 2)
#
from pyspark.ml.feature import Bucketizer
# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
splits=splits,
inputCol="ArrDelay",
outputCol="ArrDelayBucket"
)
# Save the model
arrival_bucketizer_path = "../models/arrival_bucketizer_2.0.bin"
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)
# Apply the model
ml_bucketized_features = arrival_bucketizer.transform(features_with_hour)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(5)
ml_bucketized_features.show(5)
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler
# Turn category fields into indexes
string_columns = ["Carrier", "Origin", "Dest", "Route", "TailNum"]
for column in string_columns:
string_indexer = StringIndexer(
inputCol=column,
outputCol=column + "_index"
)
string_indexer_model = string_indexer.fit(ml_bucketized_features)
ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)
# Save the pipeline model
string_indexer_output_path = "../models/string_indexer_model_4.0.{}.bin".format(
column
)
string_indexer_model.write().overwrite().save(string_indexer_output_path)
ml_bucketized_features.show(5)
# Combine continuous, numeric fields with indexes of nominal ones
# ...into one feature vector
numeric_columns = [
"DepDelay",
"Distance",
"DayOfYear",
"DayOfMonth",
"CRSDepHourOfDay",
"CRSArrHourOfDay",
"AirTime"
]
index_columns = [column + "_index" for column in string_columns]
input_columns = numeric_columns + index_columns
vector_assembler = VectorAssembler(
inputCols=input_columns,
outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)
# Save the numeric vector assembler
vector_assembler_path = "../models/numeric_vector_assembler_5.0.bin"
vector_assembler.write().overwrite().save(vector_assembler_path)
# Drop the index columns
for column in index_columns:
final_vectorized_features = final_vectorized_features.drop(column)
# Inspect the finalized features
final_vectorized_features.show(5)
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#
from collections import defaultdict
scores = defaultdict(list)
feature_importances = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3
for i in range(1, split_count + 1):
print("\nRun {} out of {} of test/train splits in cross validation...".format(
i,
split_count,
)
)
# Test/train split
training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])
# Instantiate and fit random forest classifier on all the data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(
featuresCol="Features_vec",
labelCol="ArrDelayBucket",
predictionCol="Prediction",
maxBins=4896,
maxMemoryInMB=1024
)
model = rfc.fit(training_data)
# Save the new model over the old one
model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
model.write().overwrite().save(model_output_path)
# Evaluate model using test data
predictions = model.transform(test_data)
# Evaluate this split's results for each metric
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
for metric_name in metric_names:
evaluator = MulticlassClassificationEvaluator(
labelCol="ArrDelayBucket",
predictionCol="Prediction",
metricName=metric_name
)
score = evaluator.evaluate(predictions)
scores[metric_name].append(score)
print("{} = {}".format(metric_name, score))
#
# Collect feature importances
#
feature_names = vector_assembler.getInputCols()
feature_importance_list = model.featureImportances
for feature_name, feature_importance in zip(feature_names, feature_importance_list):
feature_importances[feature_name].append(feature_importance)
In [64]:
#
# Evaluate average and STD of each metric and print a table
#
import numpy as np
score_averages = defaultdict(float)
# Compute the table data
average_stds = [] # ha
for metric_name in metric_names:
metric_scores = scores[metric_name]
average_accuracy = sum(metric_scores) / len(metric_scores)
score_averages[metric_name] = average_accuracy
std_accuracy = np.std(metric_scores)
average_stds.append((metric_name, average_accuracy, std_accuracy))
# Print the table
print("\nExperiment Log")
print("--------------")
print(tabulate(average_stds, headers=["Metric", "Average", "STD"]))
#
# Persist the score to a sccore log that exists between runs
#
import pickle
# Load the score log or initialize an empty one
try:
score_log_filename = "../models/score_log.pickle"
score_log = pickle.load(open(score_log_filename, "rb"))
if not isinstance(score_log, list):
score_log = []
except IOError:
score_log = []
# Compute the existing score log entry
score_log_entry = {
metric_name: score_averages[metric_name] for metric_name in metric_names
}
# Compute and display the change in score for each metric
try:
last_log = score_log[-1]
except (IndexError, TypeError, AttributeError):
last_log = score_log_entry
experiment_report = []
for metric_name in metric_names:
run_delta = score_log_entry[metric_name] - last_log[metric_name]
experiment_report.append((metric_name, run_delta))
print("\nExperiment Report")
print("-----------------")
print(tabulate(experiment_report, headers=["Metric", "Score"]))
# Append the existing average scores to the log
score_log.append(score_log_entry)
# Persist the log for next run
pickle.dump(score_log, open(score_log_filename, "wb"))
#
# Analyze and report feature importance changes
#
# Compute averages for each feature
feature_importance_entry = defaultdict(float)
for feature_name, value_list in feature_importances.items():
average_importance = sum(value_list) / len(value_list)
feature_importance_entry[feature_name] = average_importance
# Sort the feature importances in descending order and print
import operator
sorted_feature_importances = sorted(
feature_importance_entry.items(),
key=operator.itemgetter(1),
reverse=True
)
print("\nFeature Importances")
print("-------------------")
print(tabulate(sorted_feature_importances, headers=['Name', 'Importance']))
#
# Compare this run's feature importances with the previous run's
#
# Load the feature importance log or initialize an empty one
try:
feature_log_filename = "../models/feature_log.pickle"
feature_log = pickle.load(open(feature_log_filename, "rb"))
if not isinstance(feature_log, list):
feature_log = []
except IOError:
feature_log = []
# Compute and display the change in score for each feature
try:
last_feature_log = feature_log[-1]
except (IndexError, TypeError, AttributeError):
last_feature_log = defaultdict(float)
for feature_name, importance in feature_importance_entry.items():
last_feature_log[feature_name] = importance
# Compute the deltas
feature_deltas = {}
for feature_name in feature_importances.keys():
run_delta = feature_importance_entry[feature_name] - last_feature_log[feature_name]
feature_deltas[feature_name] = run_delta
# Sort feature deltas, biggest change first
import operator
sorted_feature_deltas = sorted(
feature_deltas.items(),
key=operator.itemgetter(1),
reverse=True
)
# Display sorted feature deltas
print("\nFeature Importance Delta Report")
print("-------------------------------")
print(tabulate(sorted_feature_deltas, headers=["Feature", "Delta"]))
# Append the existing average deltas to the log
feature_log.append(feature_importance_entry)
# Persist the log for next run
pickle.dump(feature_log, open(feature_log_filename, "wb"))
AirTime
PerformanceThis output suggests a significant improvement in performance! weightedPrecision is up by 0.14, and the FlightTime contributes about half a percent to the feature importance. Also note that the feature importance of FlightTime comes at the expense of Distance and DepDelay, which seems expected: Distance is conceptually similar to FlightTime, and DepDelay is the most important feature. Taken together, the performance and feature importance metrics indicate that FlightTime is a worthwhile improvement to our model:
At this point, once again it seems that we’ve exhausted the possibilities of the date/time features (at least, without resorting to more sophisticated time series analysis techniques than I know).
In this chapter we covered how to improve on our model using the data we’ve already collected. We can use this approach in combination with our ability to deploy applications to continuously improve our predictive systems.
Look at the list of fields in the complete dataset at https://www.transtats.bts.gov/Fields.asp. Add another field to the model. It can be directly added or it can be derived from multiple values.
Find and add an external feature to this model. Find another dataset which can be joined into this feature set to provide new features.
In [ ]: