Improving Predictions

Now that we have deployed working models predicting flight delays, it is time to “make believe” that our prediction has proven useful based on user feedback, and further that the prediction is valuable enough that prediction quality is important. In this case, it is time to iteratively improve the quality of our prediction. If a prediction is valuable enough, this becomes a full-time job for one or more people.

In this chapter we will tune our Spark ML classifier and also do additional feature engineering to improve prediction quality. In doing so, we will show you how to iteratively improve predictions.

Fixing Our Prediction Problem

At this point we realized that our model was always predicting one class, no matter the input. We began by investigating that in a Jupyter notebook at ch09/Debugging Prediction Problems.ipynb.

The notebook itself is very long, and we tried many things to fix our model. It turned out we had made a mistake. We were using OneHotEncoder on top of the output of StringIndexerModel when we were encoding our nominal/categorical string features. This is how you should encode features for models other than decision trees, but it turns out that for decision tree models, you are supposed to take the string indexes from StringIndexerModel and directly compose them with your continuous/numeric features in a VectorAssembler. Decision trees are able to infer the fact that indexes represent categories. One benefit of directly adding StringIndexes to your feature vectors is that you then get easily interpretable feature importances.

When we discovered this, we had to go back and edit the book so that we didn’t teach something that was wrong, and so this is now what you see. We thought it worthwhile to link to the notebook, though, to show how this really works in the wild: you build broken shit and then fix it.

When to Improve Predictions

Not all predictions should be improved. Often something fast and crude will work well enough as an MVP (minimum viable product). Only predictions that prove useful should be improved. It is possible to sink large volumes of time into improving the quality of a prediction, so it is essential that you connect with users before getting sucked into this task. This is why we’ve included the discussion of improving predictions in its own chapter.

Improving Prediction Performance

There are a few ways to improve an existing predictive model. The first is by tuning the parameters of the statistical model making your prediction. The second is feature engineering.

Tuning model hyperparameters to improve predictive model quality can be done by intuition, or by brute force through something called a grid or random search. We’re going to focus on feature engineering, as hyperparameter tuning is covered elsewhere. A good guide to hyperparameter tuning is available in the Spark documentation on model selection and tuning.

As we move through this chapter, we’ll be using the work we’ve done so far to perform feature engineering. Feature engineering is the most important part of making good predictions. It involves using what you’ve discovered about the data through exploratory data analysis in order to feed your machine learning algorithm better, more consequential data as input.

Experimental Adhesion Method: See What Sticks

There are several ways to decide which features to use, and Saurav Kaushik has written a post on Analytics Vidhya that introduces them well. The method we employ primarily, which we jokingly entitle the Experimental Adhesion Method, is to quickly select all the features that we can simply compute, and try them all using a random forest or gradient boosted decision tree model (note that even if our application requires another type of model, we still use decision trees to guide feature selection). Then we train the model and inspect the model’s feature importances to “see what sticks.” The most important variables are retained, and this forms the basic model we begin with.

Feature engineering is an iterative process. Based on the feature importances, we ponder what new things we might try using the data we have available. We start with the simplest idea, or the one that is easiest to implement. If the feature importances indicate one type of feature is important, and we can’t easily compute new features similar to this one, we think about how we might acquire new data to join to our training data to use as features.

The key is to be logical and systematic in our exploration of the feature space. You should think about how easy a potential feature is to compute, as well as what it would teach you if it turned out to be important. Are there other, similar features that you could try if this candidate worked? Develop hypotheses and test them in the form of new features. Evaluate each new feature in an experiment and reflect on what you’ve learned before engineering the next feature.

Establishing Rigorous Metrics for Experiments

In order to improve our classification model, we need to reliably determine its prediction quality in the first place. To do so, we need to beef up our cross-validation code, and then establish a baseline of quality for the original model. Check out ch09/baseline_spark_mllib_model.py, which we copied from ch09/train_spark_mllib_model.py and altered to improve its cross-validation code.

In order to evaluate the prediction quality of our classifier, we need to use more than one metric. Spark ML’s MulticlassClassificationEvaluator offers four metrics: accuracy, weighted precision, weighted recall, and f1.

Defining Our Classification Metrics

The raw accuracy is just what it sounds like: the number of correct predictions divided by the number of predictions. This is something to check first, but it isn’t adequate alone. Precision is a measure of how useful the result is. Recall describes how complete the results are. The f1 score incorporates both precision and recall to determine overall quality. Taken together, the changes to these metrics between consecutive runs of training our model can give us a clear picture of what is happening with our model in terms of prediction quality. We will use these metrics along with feature importance to guide our feature engineering efforts.

Feature Importance

Model quality metrics aren’t enough to guide the iterative improvements of our model. To understand what is going on with each new run, we need to employ a type of model called a decision tree.

In Spark ML, the best general-purpose multiclass classification model is an implementation of a random forest, the RandomForestClassificationModel, fit by the RandomForestClassifier. Random forests can classify or regress, and they have an important feature that helps us interrogate predictive models through a feature called feature importance.

The importance of a feature is what it sounds like: a measure of how important that feature was in contributing to the accuracy of the model. This information is incredibly useful, as it can serve as a guiding hand to feature engineering. In other words, if you know how important a feature is, you can use this clue to make changes that increase the accuracy of the model, such as removing unimportant features and trying to engineer features similar to those that are most important. Feature engineering is a major theme of Agile Data Science, and it is a big part of why we’ve been doing iterative visualization and exploration (the purpose of which is to shed light on and drive feature engineering).

Note that the state of the art for many classification and regression tasks is a gradient boosted decision tree, but as of version 2.1.0 Spark ML’s implementation—the GBTClassificationModel, which is fit by the GBTClassifier—can only do binary classification.

Getting Ready for Experiments

We need to run through the model's code from chapter 8 before we can setup and run an experiment.


In [1]:
import sys, os, re
import json
import datetime, iso8601
from tabulate import tabulate

# Initialize PySpark
APP_NAME = "Improving Predictions"

# If there is no SparkSession, create the environment
try:
    sc and spark
except NameError as e:
    import findspark
    findspark.init()
    import pyspark
    import pyspark.sql

    sc = pyspark.SparkContext()
    spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()

print("PySpark initialized...")


PySpark initialized...

In [2]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf

schema = StructType([
    StructField("ArrDelay", DoubleType(), True),     # "ArrDelay":5.0
    StructField("CRSArrTime", TimestampType(), True),    # "CRSArrTime":"2015-12-31T03:20:00.000-08:00"
    StructField("CRSDepTime", TimestampType(), True),    # "CRSDepTime":"2015-12-31T03:05:00.000-08:00"
    StructField("Carrier", StringType(), True),     # "Carrier":"WN"
    StructField("DayOfMonth", IntegerType(), True), # "DayOfMonth":31
    StructField("DayOfWeek", IntegerType(), True),  # "DayOfWeek":4
    StructField("DayOfYear", IntegerType(), True),  # "DayOfYear":365
    StructField("DepDelay", DoubleType(), True),     # "DepDelay":14.0
    StructField("Dest", StringType(), True),        # "Dest":"SAN"
    StructField("Distance", DoubleType(), True),     # "Distance":368.0
    StructField("FlightDate", DateType(), True),    # "FlightDate":"2015-12-30T16:00:00.000-08:00"
    StructField("FlightNum", StringType(), True),   # "FlightNum":"6109"
    StructField("Origin", StringType(), True),      # "Origin":"TUS"
])

input_path = "../data/simple_flight_delay_features.jsonl"
features = spark.read.json(input_path, schema=schema)

# Sample 10% to make executable inside the notebook
features = features.sample(False, 0.1)

features.show(3)
features.first()


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+
|    43.0|2015-01-02 22:55:00|2015-01-02 22:05:00|     WN|         2|        5|        2|    35.0| CRP|   187.0|2015-01-02|     1981|   HOU|
|     6.0|2015-01-02 12:55:00|2015-01-02 12:00:00|     WN|         2|        5|        2|     7.0| DAL|   239.0|2015-01-02|       24|   HOU|
|    18.0|2015-01-02 20:00:00|2015-01-02 19:00:00|     WN|         2|        5|        2|    27.0| DAL|   239.0|2015-01-02|       52|   HOU|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+
only showing top 3 rows

Out[2]:
Row(ArrDelay=43.0, CRSArrTime=datetime.datetime(2015, 1, 2, 22, 55), CRSDepTime=datetime.datetime(2015, 1, 2, 22, 5), Carrier='WN', DayOfMonth=2, DayOfWeek=5, DayOfYear=2, DepDelay=35.0, Dest='CRP', Distance=187.0, FlightDate=datetime.date(2015, 1, 2), FlightNum='1981', Origin='HOU')

In [3]:
#
# Check for nulls in features before using Spark ML
#
# null_counts = [(column, features.where(features[column].isNull()).count()) for column in features.columns]
# cols_with_nulls = filter(lambda x: x[1] > 0, null_counts)
# print("Columns with nulls that need to be filtered: {}".format(
#     str(list(cols_with_nulls))
# ))

In [4]:
#
# Add a Route variable to replace FlightNum
#
from pyspark.sql.functions import lit, concat
features_with_route = features.withColumn(
  'Route',
  concat(
    features.Origin,
    lit('-'),
    features.Dest
  )
)
features_with_route.show(3)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+
|    43.0|2015-01-02 22:55:00|2015-01-02 22:05:00|     WN|         2|        5|        2|    35.0| CRP|   187.0|2015-01-02|     1981|   HOU|HOU-CRP|
|     6.0|2015-01-02 12:55:00|2015-01-02 12:00:00|     WN|         2|        5|        2|     7.0| DAL|   239.0|2015-01-02|       24|   HOU|HOU-DAL|
|    18.0|2015-01-02 20:00:00|2015-01-02 19:00:00|     WN|         2|        5|        2|    27.0| DAL|   239.0|2015-01-02|       52|   HOU|HOU-DAL|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+
only showing top 3 rows


In [5]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay into on-time, slightly late, very late (0, 1, 2)
#
from pyspark.ml.feature import Bucketizer

# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
  splits=splits,
  inputCol="ArrDelay",
  outputCol="ArrDelayBucket"
)

# Save the model
arrival_bucketizer_path = "../models/arrival_bucketizer_2.0.bin"
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)

# Apply the model
ml_bucketized_features = arrival_bucketizer.transform(features_with_route)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(5)


+--------+--------------+
|ArrDelay|ArrDelayBucket|
+--------+--------------+
|    43.0|           3.0|
|     6.0|           2.0|
|    18.0|           2.0|
|   -13.0|           1.0|
|    -7.0|           1.0|
+--------+--------------+
only showing top 5 rows


In [6]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Turn category fields into indexes
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
             "Origin", "Dest", "Route"]:
    
    print("Indexing column \"{}\" ...".format(column))
    
    string_indexer = StringIndexer(
      inputCol=column,
      outputCol=column + "_index"
    )

    string_indexer_model = string_indexer.fit(ml_bucketized_features)
    ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)

    # Drop the original column
    ml_bucketized_features = ml_bucketized_features.drop(column)

    # Save the pipeline model
    string_indexer_output_path = "../models/string_indexer_model_{}.bin".format(
      column
    )
    string_indexer_model.write().overwrite().save(string_indexer_output_path)

print("Indexed all string columns!")


Indexing column "Carrier" ...
Indexing column "DayOfMonth" ...
Indexing column "DayOfWeek" ...
Indexing column "DayOfYear" ...
Indexing column "Origin" ...
Indexing column "Dest" ...
Indexing column "Route" ...
Indexed all string columns!

In [7]:
# Handle continuous, numeric fields by combining them into one feature vector
numeric_columns = ["DepDelay", "Distance"]
index_columns = ["Carrier_index", "DayOfMonth_index",
                 "DayOfWeek_index", "DayOfYear_index", "Origin_index",
                 "Origin_index", "Dest_index", "Route_index"]
vector_assembler = VectorAssembler(
    inputCols=numeric_columns + index_columns,
    outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)

# Save the numeric vector assembler
vector_assembler_path = "../models/numeric_vector_assembler.bin"
vector_assembler.write().overwrite().save(vector_assembler_path)

# Drop the index columns
for column in index_columns:
    final_vectorized_features = final_vectorized_features.drop(column)

# Inspect the finalized features
final_vectorized_features.show(5)


+--------+-------------------+-------------------+--------+--------+----------+---------+--------------+--------------------+
|ArrDelay|         CRSArrTime|         CRSDepTime|DepDelay|Distance|FlightDate|FlightNum|ArrDelayBucket|        Features_vec|
+--------+-------------------+-------------------+--------+--------+----------+---------+--------------+--------------------+
|    43.0|2015-01-02 22:55:00|2015-01-02 22:05:00|    35.0|   187.0|2015-01-02|     1981|           3.0|[35.0,187.0,0.0,1...|
|     6.0|2015-01-02 12:55:00|2015-01-02 12:00:00|     7.0|   239.0|2015-01-02|       24|           2.0|[7.0,239.0,0.0,1....|
|    18.0|2015-01-02 20:00:00|2015-01-02 19:00:00|    27.0|   239.0|2015-01-02|       52|           2.0|[27.0,239.0,0.0,1...|
|   -13.0|2015-01-02 07:40:00|2015-01-02 06:00:00|    12.0|   883.0|2015-01-02|     1705|           1.0|[12.0,883.0,0.0,1...|
|    -7.0|2015-01-02 11:55:00|2015-01-02 10:20:00|    -3.0|   571.0|2015-01-02|     4207|           1.0|[-3.0,571.0,0.0,1...|
+--------+-------------------+-------------------+--------+--------+----------+---------+--------------+--------------------+
only showing top 5 rows

Implementing A More Rigorous Experiment

In order to be confident in our experiment for each measure, we need to repeat it at least twice to see how it varies. This is the degree to which we cross-validate. In addition, we need to loop and run the measurement code once for each score. Once we’ve collected several scores for each metric, we look at both the average and standard deviation for each score. Taken together, these scores give us a picture of the quality of our classifier.

To begin, we need to iterate and repeat our experiment N times. For each experiment we need to compute a test/train split, then we need to train the model on the training data and apply it to the test data. Then we use MulticlassClassificationEvaluator to get a score, once for each metric. We gather the scores in a list for each metric, which we will evaluate at the end of the experiment:


In [8]:
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#

from collections import defaultdict
scores = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3

for i in range(1, split_count + 1):
  print("Run {} out of {} of test/train splits in cross validation...".format(
      i,
      split_count,
    )
  )

  # Test/train split
  training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])

  # Instantiate and fit random forest classifier on all the data
  from pyspark.ml.classification import RandomForestClassifier
  rfc = RandomForestClassifier(
    featuresCol="Features_vec",
    labelCol="ArrDelayBucket",
    predictionCol="Prediction",
    maxBins=4657,
    maxMemoryInMB=1024
  )
  model = rfc.fit(training_data)

  # Save the new model over the old one
  model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
  model.write().overwrite().save(model_output_path)

  # Evaluate model using test data
  predictions = model.transform(test_data)
  
  # Evaluate this split's results for each metric
  from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  for metric_name in metric_names:
    
    evaluator = MulticlassClassificationEvaluator(
      labelCol="ArrDelayBucket",
      predictionCol="Prediction",
      metricName=metric_name
    )
    score = evaluator.evaluate(predictions)

    scores[metric_name].append(score)
    print("{} = {}".format(metric_name, score))


Run 1 out of 3 of test/train splits in cross validation...
accuracy = 0.5759860155140392
weightedPrecision = 0.6359357493299552
weightedRecall = 0.575986015514039
f1 = 0.5119318485038695
Run 2 out of 3 of test/train splits in cross validation...
accuracy = 0.5807802512673573
weightedPrecision = 0.661402729667335
weightedRecall = 0.5807802512673573
f1 = 0.5169955455480277
Run 3 out of 3 of test/train splits in cross validation...
accuracy = 0.591925331018016
weightedPrecision = 0.6823700454436153
weightedRecall = 0.5919253310180161
f1 = 0.5290098787205513

Processing Run Results

Our run leaves us with a defaultdict of scores, with one list for each metric. Now we need to compute the average and standard deviation of each list to give us the overall average and standard deviation of each metric:

Note that we need to compute both the average and standard deviation of each metric from our run. The average will tell us the approximate performance level, and the standard deviation will tell us how much a model's performance varies. Less variance is desirable. We'll use this information in tuning our model.


In [9]:
#
# Evaluate average and STD of each metric and print a table
#
import numpy as np
score_averages = defaultdict(float)

# Compute the table data
average_stds = [] # ha
for metric_name in metric_names:
  metric_scores = scores[metric_name]
  
  average_accuracy = sum(metric_scores) / len(metric_scores)
  score_averages[metric_name] = average_accuracy
  
  std_accuracy = np.std(metric_scores)
  
  average_stds.append((metric_name, average_accuracy, std_accuracy))

# Print the table
print("\nExperiment Log")
print("--------------")
print(tabulate(average_stds, headers=["Metric", "Average", "STD"]))


Experiment Log
--------------
Metric               Average         STD
-----------------  ---------  ----------
accuracy            0.582897  0.00667715
weightedPrecision   0.659903  0.0189864
weightedRecall      0.582897  0.00667715
f1                  0.519312  0.00716197

The standard deviations indicate that we might not even need to perform k-fold cross-validation, but an inspection of the underlying scores says otherwise:


In [10]:
scores


Out[10]:
defaultdict(list,
            {'accuracy': [0.5759860155140392,
              0.5807802512673573,
              0.591925331018016],
             'weightedPrecision': [0.6359357493299552,
              0.661402729667335,
              0.6823700454436153],
             'weightedRecall': [0.575986015514039,
              0.5807802512673573,
              0.5919253310180161],
             'f1': [0.5119318485038695,
              0.5169955455480277,
              0.5290098787205513]})

There is actually significant variation between runs, and this could obscure a small improvement (or degradation) in prediction quality.

The iterations take time, and this discourages experimentation. A middle ground should be found.

Comparing Experiments to Determine Improvements

Now that we have our baseline metrics, we can repeat this code as we improve the model and see what the effect is in terms of the four metrics available to us. So it seems we are done, that we can start playing with our model and features. However, we will quickly run into a problem. We will lose track of the score from the previous run, printed on the screen above many logs for each run, unless we write it down each time. And this is tedious. So, we need to automate this process.

What we need to do is load a score log from disk, evaluate the current score in terms of the previous one, and store a new entry to the log back to disk for the next run to access. The following code achieves this aim.

First we use pickle to load any existing score log. If this is not present, we initialize a new log, which is simply an empty Python list. Next we prepare the new log entry—a simple Python dict containing the average score for each of four metrics. Then we subtract the previous run’s score to determine the change in this run. This is the information we use to evaluate whether our change worked or not (along with any changes in feature importances, which we will address as well).

Finally, we append the new score entry to the log and store it back to disk:


In [11]:
#
# Persist the score to a sccore log that exists between runs
#
import pickle

# Load the score log or initialize an empty one
try:
  score_log_filename = "../models/score_log.pickle"
  score_log = pickle.load(open(score_log_filename, "rb"))
  if not isinstance(score_log, list):
    score_log = []
except IOError:
  score_log = []

# Compute the existing score log entry
score_log_entry = {metric_name: score_averages[metric_name] for metric_name in metric_names}

# Compute and display the change in score for each metric
try:
  last_log = score_log[-1]
except (IndexError, TypeError, AttributeError):
  last_log = score_log_entry

experiment_report = []
for metric_name in metric_names:
  run_delta = score_log_entry[metric_name] - last_log[metric_name]
  experiment_report.append((metric_name, run_delta))

print("\nExperiment Report")
print("-----------------")
print(tabulate(experiment_report, headers=["Metric", "Score"]))

# Append the existing average scores to the log
score_log.append(score_log_entry)

# Persist the log for next run
pickle.dump(score_log, open(score_log_filename, "wb"))


Experiment Report
-----------------
Metric                   Score
-----------------  -----------
accuracy           -0.00605381
weightedPrecision   0.138394
weightedRecall     -0.00605381
f1                 -0.00615082

Now when we run our script, we will get a report that shows the change between this run and the last run. We can use this, along with our feature importances, to direct our efforts at improving the model. For instance, an example test run shows the model accuracy increase by .003:

Experiment Report
-----------------
Metric                   Score
-----------------  -----------
accuracy            0.00300548
weightedPrecision  -0.00592227
weightedRecall      0.00300548
f1                 -0.0105553

Jump back to the code for the model, the code under the section Implementing a More Rigorous Experiment. Re-run all the code between there and here, the last three code blocks. See how the score changed slightly? You will use these changes to guide you as you change the model!

Inspecting Changes in Feature Importance

We can use the list of columns given to our final VectorAssembler along with RandomForestClassificationModel.featureImportances to derive the importance of each named feature. This is extremely valuable, because like with our prediction quality scores, we can look at changes in feature importances for all features between runs. If a newly introduced feature turns out to be important, it is usually worth adding to the model, so long as it doesn’t hurt quality.

Logging Feature Importances

We begin by altering our experiment loop to record feature importances for each run. Check out the abbreviated content from ch09/improved_spark_mllib_model.py:


In [12]:
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#

from collections import defaultdict
scores = defaultdict(list)
feature_importances = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3

for i in range(1, split_count + 1):
  print(f"\nRun {i} out of {split_count} of test/train splits in cross validation...")

  # Test/train split
  training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])

  # Instantiate and fit random forest classifier on all the data
  from pyspark.ml.classification import RandomForestClassifier
  rfc = RandomForestClassifier(
    featuresCol="Features_vec",
    labelCol="ArrDelayBucket",
    predictionCol="Prediction",
    maxBins=4657,
  )
  model = rfc.fit(training_data)

  # Save the new model over the old one
  model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
  model.write().overwrite().save(model_output_path)

  # Evaluate model using test data
  predictions = model.transform(test_data)

  # Evaluate this split's results for each metric
  from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  for metric_name in metric_names:
    evaluator = MulticlassClassificationEvaluator(
      labelCol="ArrDelayBucket",
      predictionCol="Prediction",
      metricName=metric_name
    )
    score = evaluator.evaluate(predictions)
  
    scores[metric_name].append(score)
    print(f"{metric_name} = {score}")

  #
  # Collect feature importances
  #
  feature_names = vector_assembler.getInputCols()
  feature_importance_list = model.featureImportances
  for feature_name, feature_importance in zip(feature_names, feature_importance_list):
    feature_importances[feature_name].append(feature_importance)


Run 1 out of 3 of test/train splits in cross validation...
accuracy = 0.5848054622304107
weightedPrecision = 0.6558041127536607
weightedRecall = 0.5848054622304107
f1 = 0.5218379487837884

Run 2 out of 3 of test/train splits in cross validation...
accuracy = 0.587204108977222
weightedPrecision = 0.6188744426630224
weightedRecall = 0.587204108977222
f1 = 0.5227055173394767

Run 3 out of 3 of test/train splits in cross validation...
accuracy = 0.5851237866724833
weightedPrecision = 0.6581273095536212
weightedRecall = 0.5851237866724833
f1 = 0.5186316300276008

Inspecting Feature Importances

Next, we need to compute the average of the importance for each feature. Note that we use a defaultdict(float) to ensure that accessing empty keys returns zero. This will be important when comparing entries in the log with different sets of features. In order to print the feature importances, we need to sort them first, by descending order of importance:


In [13]:
#
# Analyze and report feature importance changes
#

# Compute averages for each feature
feature_importance_entry = defaultdict(float)
for feature_name, value_list in feature_importances.items():
  average_importance = sum(value_list) / len(value_list)
  feature_importance_entry[feature_name] = average_importance

# Sort the feature importances in descending order and print
import operator
sorted_feature_importances = sorted(
  feature_importance_entry.items(),
  key=operator.itemgetter(1),
  reverse=True
)

print("\nFeature Importances")
print("-------------------")
print(tabulate(sorted_feature_importances, headers=['Name', 'Importance']))


Feature Importances
-------------------
Name                Importance
----------------  ------------
DepDelay            0.870979
DayOfYear_index     0.0302338
Route_index         0.0193866
DayOfMonth_index    0.0186692
Origin_index        0.0185116
Distance            0.0080768
Dest_index          0.00800071
Carrier_index       0.00515996
DayOfWeek_index     0.00247118

Feature Importance Differences Between Runs

Next we need to perform the same housekeeping as we did for the model score log: load the model, create an entry for this experiment, load the last experiment and compute the change for each feature between that experiment and the current one, and then print a report on these deltas.

First we load the last feature log. If it isn’t available because it doesn’t exist, we initialize the last_feature_log with zeros for each feature, so that new features will have a positive score equal to their amount:


In [14]:
#
# Compare this run's feature importances with the previous run's
#
  
# Load the feature importance log or initialize an empty one
try:
  feature_log_filename = "../models/feature_log.pickle"
  feature_log = pickle.load(open(feature_log_filename, "rb"))
  if not isinstance(feature_log, list):
    feature_log = []
except IOError:
  feature_log = []

# Compute and display the change in score for each feature
try:
  last_feature_log = feature_log[-1]
except (IndexError, TypeError, AttributeError):
  last_feature_log = defaultdict(float)
  for feature_name, importance in feature_importance_entry.items():
    last_feature_log[feature_name] = importance

Next we compute the change between the last run and the current one:


In [15]:
# Compute the deltas
feature_deltas = {}
for feature_name in feature_importances.keys():
  run_delta = feature_importance_entry[feature_name] - last_feature_log[feature_name]
  feature_deltas[feature_name] = run_delta

In order to display them, we need to sort the feature importance changes in descending order, to show the biggest change first:


In [16]:
# Sort feature deltas, biggest change first
import operator
sorted_feature_deltas = sorted(
  feature_deltas.items(),
  key=operator.itemgetter(1),
  reverse=True
)

Then we display the sorted feature deltas:


In [17]:
# Display sorted feature deltas
print("\nFeature Importance Delta Report")
print("-------------------------------")
print(tabulate(sorted_feature_deltas, headers=["Feature", "Delta"]))


Feature Importance Delta Report
-------------------------------
Feature                  Delta
----------------  ------------
DepDelay           0.037808
DayOfYear_index    0.0302338
DayOfMonth_index   0.0186692
Origin_index       0.00611677
DayOfWeek_index    0.00247118
Carrier_index      0.000253099
Dest_index        -0.00162545
Distance          -0.00389137
Route_index       -0.00451193

Finally, as with the score log, we append our entry to the log and save it for the next run:


In [18]:
# Append the existing average deltas to the log
feature_log.append(feature_importance_entry)

# Persist the log for next run
pickle.dump(feature_log, open(feature_log_filename, "wb"))

We’ll use the raw feature importances as well as the changes in feature importance to guide our creation or alteration of features as we improve the model.

Conclusion

Now that we have the ability to understand the effect of changes between experimental runs, we can detect changes that improve our model. We can start adding features to test their effect on the model’s prediction quality, and pursue related features that help improve quality! Without this setup, we would be hard put to make positive changes. With it, we are only bounded by our creativity in our efforts to improve the model.

Time of Day as a Feature

In examining our feature importances, it looks like the date/time fields have some impact. What if we extracted the hour/minute as an integer from the datetime for departure/arrival fields? This would inform the model about morning versus afternoon versus red-eye flights, which surely affects on-time performance, as there is more traffic in the morning than overnight.

Check out ch09/explore_delays.py. Let’s start by exploring the premise of this feature, that lateness varies by the time of day of the flight:


In [19]:
features.registerTempTable("features")
features.show(5)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+
|    43.0|2015-01-02 22:55:00|2015-01-02 22:05:00|     WN|         2|        5|        2|    35.0| CRP|   187.0|2015-01-02|     1981|   HOU|
|     6.0|2015-01-02 12:55:00|2015-01-02 12:00:00|     WN|         2|        5|        2|     7.0| DAL|   239.0|2015-01-02|       24|   HOU|
|    18.0|2015-01-02 20:00:00|2015-01-02 19:00:00|     WN|         2|        5|        2|    27.0| DAL|   239.0|2015-01-02|       52|   HOU|
|   -13.0|2015-01-02 07:40:00|2015-01-02 06:00:00|     WN|         2|        5|        2|    12.0| DEN|   883.0|2015-01-02|     1705|   HOU|
|    -7.0|2015-01-02 11:55:00|2015-01-02 10:20:00|     WN|         2|        5|        2|    -3.0| ECP|   571.0|2015-01-02|     4207|   HOU|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+
only showing top 5 rows


In [20]:
spark.sql("""
  SELECT
    HOUR(CRSDepTime) + 1 AS Hour,
    AVG(ArrDelay),
    STD(ArrDelay)
  FROM features
  GROUP BY HOUR(CRSDepTime)
  ORDER BY HOUR(CRSDepTime)
""").show(24)


+----+-------------------+---------------------+
|Hour|      avg(ArrDelay)|stddev_samp(ArrDelay)|
+----+-------------------+---------------------+
|   1| -3.893617021276596|   20.182034674166754|
|   2|  5.756756756756757|   30.962706425459473|
|   3| 13.416666666666666|    64.53393516498733|
|   4|               23.0|   23.065125189341593|
|   5|                0.4|   10.737783756436894|
|   6|-2.7504187604690116|    30.35433397831846|
|   7|-0.4401114206128134|   40.152195601591316|
|   8|-0.9037614368010843|    35.92628360467317|
|   9| 0.8433587786259542|    33.08585785402741|
|  10| 2.0286701208981004|    29.64847389458358|
|  11|   5.95457650273224|    45.82743048946165|
|  12|  4.779426993441491|    36.68864467963427|
|  13|  5.700393278512692|     42.5132939301283|
|  14|  6.029696578437702|    35.20577118602809|
|  15|  7.701895043731779|    38.17410348000706|
|  16|  7.808480565371025|    37.93224875212213|
|  17|  8.223712835387962|   39.280152086102596|
|  18|    8.9209726443769|    41.85564041446721|
|  19| 10.703585657370517|    40.95852079310993|
|  20| 11.647607934655776|   46.025372518666515|
|  21|  9.797132235793946|   42.199556940941086|
|  22| 10.248484848484848|   42.130564387689425|
|  23|  5.594294770206022|   30.682815092521444|
|  24|  4.632183908045977|   36.934489284051836|
+----+-------------------+---------------------+


In [21]:
spark.sql("""
  SELECT
    HOUR(CRSArrTime) + 1 AS Hour,
    AVG(ArrDelay),
    STD(ArrDelay)
  FROM features
  GROUP BY HOUR(CRSArrTime)
  ORDER BY HOUR(CRSArrTime)
""").show(24)


+----+--------------------+---------------------+
|Hour|       avg(ArrDelay)|stddev_samp(ArrDelay)|
+----+--------------------+---------------------+
|   1|            12.96875|    44.08554264759498|
|   2|   8.268292682926829|    39.98876366571234|
|   3|                11.5|    41.98743198201629|
|   4|               -1.75|   22.704124575223613|
|   5|   5.344827586206897|    37.53199885016851|
|   6|  1.8566037735849057|     34.6423584069808|
|   7|-0.01263157894736842|   31.149900414010926|
|   8|  1.0947054436987322|    48.90292678425451|
|   9| -0.2966728280961183|    34.74757754418654|
|  10| -1.0765265662172878|   29.805160166316067|
|  11|  1.0594289508632138|    35.27153464904483|
|  12|  1.4428259286234524|    39.81548299617712|
|  13|   4.062521008403361|    34.01633558228581|
|  14|   4.841894353369764|   39.586194945073466|
|  15|   5.102511880515954|    37.92038412105885|
|  16|   5.191109555477774|   35.272496914442854|
|  17|   6.571292775665399|    35.76201031340733|
|  18|   8.639368895567243|    43.13049596430492|
|  19|   8.666666666666666|    39.81014321100816|
|  20|   9.608771929824561|    38.50663894431138|
|  21|  10.441700960219478|    46.51739411355749|
|  22|   9.639167595689335|   45.578718342016785|
|  23|   9.668339416058394|   39.350731791672594|
|  24|   10.72784412319296|   40.562252462679226|
+----+--------------------+---------------------+


In [22]:
from pyspark.sql.functions import hour

features = features.withColumn('CRSDepHourOfDay', hour(features.CRSDepTime))
features = features.withColumn('CRSArrHourOfDay', hour(features.CRSArrTime))

departure_cov = features.stat.cov('CRSDepHourOfDay', 'ArrDelay')
arrival_cov = features.stat.cov('CRSArrHourOfDay', 'ArrDelay')

print("Departure delay covariance: {:,}".format(departure_cov))
print("Arrival delay covariance:   {:,}".format(arrival_cov))


Departure delay covariance: 16.66980794454114
Arrival delay covariance:   15.940080898824531

In [23]:
features.select(
    "CRSDepTime", 
    "CRSDepHourOfDay", 
    "CRSArrTime", 
    "CRSArrHourOfDay"
).show()


+-------------------+---------------+-------------------+---------------+
|         CRSDepTime|CRSDepHourOfDay|         CRSArrTime|CRSArrHourOfDay|
+-------------------+---------------+-------------------+---------------+
|2015-01-02 22:05:00|             22|2015-01-02 22:55:00|             22|
|2015-01-02 12:00:00|             12|2015-01-02 12:55:00|             12|
|2015-01-02 19:00:00|             19|2015-01-02 20:00:00|             20|
|2015-01-02 06:00:00|              6|2015-01-02 07:40:00|              7|
|2015-01-02 10:20:00|             10|2015-01-02 11:55:00|             11|
|2015-01-02 14:10:00|             14|2015-01-02 15:10:00|             15|
|2015-01-02 10:20:00|             10|2015-01-02 13:40:00|             13|
|2015-01-02 17:00:00|             17|2015-01-02 18:45:00|             18|
|2015-01-02 07:35:00|              7|2015-01-02 10:45:00|             10|
|2015-01-02 05:50:00|              5|2015-01-02 08:25:00|              8|
|2015-01-02 09:25:00|              9|2015-01-02 10:25:00|             10|
|2015-01-02 08:50:00|              8|2015-01-02 11:10:00|             11|
|2015-01-02 20:20:00|             20|2015-01-02 23:45:00|             23|
|2015-01-02 19:30:00|             19|2015-01-02 21:20:00|             21|
|2015-01-02 06:30:00|              6|2015-01-02 07:30:00|              7|
|2015-01-02 12:00:00|             12|2015-01-02 12:45:00|             12|
|2015-01-02 20:20:00|             20|2015-01-02 21:15:00|             21|
|2015-01-02 13:55:00|             13|2015-01-02 17:00:00|             17|
|2015-01-02 08:00:00|              8|2015-01-02 10:45:00|             10|
|2015-01-02 11:45:00|             11|2015-01-02 16:45:00|             16|
+-------------------+---------------+-------------------+---------------+
only showing top 20 rows

Encoding Our New Features

Now we must repeat the feature encoding process such that it includes these new features. Lets take a look at what our features look like at this moment:


In [24]:
features.show(5)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|CRSDepHourOfDay|CRSArrHourOfDay|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+
|    43.0|2015-01-02 22:55:00|2015-01-02 22:05:00|     WN|         2|        5|        2|    35.0| CRP|   187.0|2015-01-02|     1981|   HOU|             22|             22|
|     6.0|2015-01-02 12:55:00|2015-01-02 12:00:00|     WN|         2|        5|        2|     7.0| DAL|   239.0|2015-01-02|       24|   HOU|             12|             12|
|    18.0|2015-01-02 20:00:00|2015-01-02 19:00:00|     WN|         2|        5|        2|    27.0| DAL|   239.0|2015-01-02|       52|   HOU|             19|             20|
|   -13.0|2015-01-02 07:40:00|2015-01-02 06:00:00|     WN|         2|        5|        2|    12.0| DEN|   883.0|2015-01-02|     1705|   HOU|              6|              7|
|    -7.0|2015-01-02 11:55:00|2015-01-02 10:20:00|     WN|         2|        5|        2|    -3.0| ECP|   571.0|2015-01-02|     4207|   HOU|             10|             11|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+
only showing top 5 rows

So we're back at the beginning, and still have to add Route, bucketize the data, encode our string and numeric fields and then combine them into a single vector. Lets get started!


In [25]:
#
# Add a Route variable to replace FlightNum
#
from pyspark.sql.functions import lit, concat
features_with_route = features.withColumn(
  'Route',
  concat(
    features.Origin,
    lit('-'),
    features.Dest
  )
)
features_with_route.show(6)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+-------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|CRSDepHourOfDay|CRSArrHourOfDay|  Route|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+-------+
|    43.0|2015-01-02 22:55:00|2015-01-02 22:05:00|     WN|         2|        5|        2|    35.0| CRP|   187.0|2015-01-02|     1981|   HOU|             22|             22|HOU-CRP|
|     6.0|2015-01-02 12:55:00|2015-01-02 12:00:00|     WN|         2|        5|        2|     7.0| DAL|   239.0|2015-01-02|       24|   HOU|             12|             12|HOU-DAL|
|    18.0|2015-01-02 20:00:00|2015-01-02 19:00:00|     WN|         2|        5|        2|    27.0| DAL|   239.0|2015-01-02|       52|   HOU|             19|             20|HOU-DAL|
|   -13.0|2015-01-02 07:40:00|2015-01-02 06:00:00|     WN|         2|        5|        2|    12.0| DEN|   883.0|2015-01-02|     1705|   HOU|              6|              7|HOU-DEN|
|    -7.0|2015-01-02 11:55:00|2015-01-02 10:20:00|     WN|         2|        5|        2|    -3.0| ECP|   571.0|2015-01-02|     4207|   HOU|             10|             11|HOU-ECP|
|   -14.0|2015-01-02 15:10:00|2015-01-02 14:10:00|     WN|         2|        5|        2|     0.0| ELP|   677.0|2015-01-02|     1258|   HOU|             14|             15|HOU-ELP|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+-------+
only showing top 6 rows


In [26]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay into on-time, slightly late, very late (0, 1, 2)
#
from pyspark.ml.feature import Bucketizer

# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
  splits=splits,
  inputCol="ArrDelay",
  outputCol="ArrDelayBucket"
)

# Save the model
arrival_bucketizer_path = "../models/arrival_bucketizer_2.0.bin"
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)

# Apply the model
ml_bucketized_features = arrival_bucketizer.transform(features_with_route)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(5)


+--------+--------------+
|ArrDelay|ArrDelayBucket|
+--------+--------------+
|    43.0|           3.0|
|     6.0|           2.0|
|    18.0|           2.0|
|   -13.0|           1.0|
|    -7.0|           1.0|
+--------+--------------+
only showing top 5 rows


In [27]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Turn category fields into indexes
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
               "Origin", "Dest", "Route"]:
  string_indexer = StringIndexer(
    inputCol=column,
    outputCol=column + "_index"
  )

  string_indexer_model = string_indexer.fit(ml_bucketized_features)
  ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)

  # Save the pipeline model
  string_indexer_output_path = "../models/string_indexer_model_3.0.{}.bin".format(
    column
  )
  string_indexer_model.write().overwrite().save(string_indexer_output_path)

ml_bucketized_features.show(5)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+-------+--------------+-------------+----------------+---------------+---------------+------------+----------+-----------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|CRSDepHourOfDay|CRSArrHourOfDay|  Route|ArrDelayBucket|Carrier_index|DayOfMonth_index|DayOfWeek_index|DayOfYear_index|Origin_index|Dest_index|Route_index|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+-------+--------------+-------------+----------------+---------------+---------------+------------+----------+-----------+
|    43.0|2015-01-02 22:55:00|2015-01-02 22:05:00|     WN|         2|        5|        2|    35.0| CRP|   187.0|2015-01-02|     1981|   HOU|             22|             22|HOU-CRP|           3.0|          0.0|             1.0|            0.0|            1.0|        28.0|     102.0|      938.0|
|     6.0|2015-01-02 12:55:00|2015-01-02 12:00:00|     WN|         2|        5|        2|     7.0| DAL|   239.0|2015-01-02|       24|   HOU|             12|             12|HOU-DAL|           2.0|          0.0|             1.0|            0.0|            1.0|        28.0|      27.0|       55.0|
|    18.0|2015-01-02 20:00:00|2015-01-02 19:00:00|     WN|         2|        5|        2|    27.0| DAL|   239.0|2015-01-02|       52|   HOU|             19|             20|HOU-DAL|           2.0|          0.0|             1.0|            0.0|            1.0|        28.0|      27.0|       55.0|
|   -13.0|2015-01-02 07:40:00|2015-01-02 06:00:00|     WN|         2|        5|        2|    12.0| DEN|   883.0|2015-01-02|     1705|   HOU|              6|              7|HOU-DEN|           1.0|          0.0|             1.0|            0.0|            1.0|        28.0|       3.0|     1089.0|
|    -7.0|2015-01-02 11:55:00|2015-01-02 10:20:00|     WN|         2|        5|        2|    -3.0| ECP|   571.0|2015-01-02|     4207|   HOU|             10|             11|HOU-ECP|           1.0|          0.0|             1.0|            0.0|            1.0|        28.0|     151.0|     2673.0|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+-------+--------------+-------------+----------------+---------------+---------------+------------+----------+-----------+
only showing top 5 rows


In [28]:
# Combine continuous, numeric fields with indexes of nominal ones
# ...into one feature vector
numeric_columns = [
  "DepDelay", "Distance",
  "CRSDepHourOfDay", "CRSArrHourOfDay"
]
index_columns = ["Carrier_index", "DayOfMonth_index",
                 "DayOfWeek_index", "DayOfYear_index", "Origin_index",
                 "Origin_index", "Dest_index", "Route_index"]
vector_assembler = VectorAssembler(
  inputCols=numeric_columns + index_columns,
  outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)

# Save the numeric vector assembler
vector_assembler_path = "../models/numeric_vector_assembler_3.0.bin"
vector_assembler.write().overwrite().save(vector_assembler_path)

# Drop the index columns
for column in index_columns:
  final_vectorized_features = final_vectorized_features.drop(column)

# Inspect the finalized features
final_vectorized_features.show(5)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+-------+--------------+--------------------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|CRSDepHourOfDay|CRSArrHourOfDay|  Route|ArrDelayBucket|        Features_vec|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+-------+--------------+--------------------+
|    43.0|2015-01-02 22:55:00|2015-01-02 22:05:00|     WN|         2|        5|        2|    35.0| CRP|   187.0|2015-01-02|     1981|   HOU|             22|             22|HOU-CRP|           3.0|[35.0,187.0,22.0,...|
|     6.0|2015-01-02 12:55:00|2015-01-02 12:00:00|     WN|         2|        5|        2|     7.0| DAL|   239.0|2015-01-02|       24|   HOU|             12|             12|HOU-DAL|           2.0|[7.0,239.0,12.0,1...|
|    18.0|2015-01-02 20:00:00|2015-01-02 19:00:00|     WN|         2|        5|        2|    27.0| DAL|   239.0|2015-01-02|       52|   HOU|             19|             20|HOU-DAL|           2.0|[27.0,239.0,19.0,...|
|   -13.0|2015-01-02 07:40:00|2015-01-02 06:00:00|     WN|         2|        5|        2|    12.0| DEN|   883.0|2015-01-02|     1705|   HOU|              6|              7|HOU-DEN|           1.0|[12.0,883.0,6.0,7...|
|    -7.0|2015-01-02 11:55:00|2015-01-02 10:20:00|     WN|         2|        5|        2|    -3.0| ECP|   571.0|2015-01-02|     4207|   HOU|             10|             11|HOU-ECP|           1.0|[-3.0,571.0,10.0,...|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------------+---------------+-------+--------------+--------------------+
only showing top 5 rows

Training with Our New Features

Now we will train the model again, noting the performance and feature importances as we did before. This allows us to see the change in performance owing to the introduction of these new fields, CRSDepHourOfDay and CRSArrHourOfDay.


In [29]:
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#

from collections import defaultdict
scores = defaultdict(list)
feature_importances = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3

for i in range(1, split_count + 1):
  print(f"\nRun {i} out of {split_count} of test/train splits in cross validation...")

  # Test/train split
  training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])

  # Instantiate and fit random forest classifier on all the data
  from pyspark.ml.classification import RandomForestClassifier
  rfc = RandomForestClassifier(
    featuresCol="Features_vec",
    labelCol="ArrDelayBucket",
    predictionCol="Prediction",
    maxBins=4657,
    maxMemoryInMB=1024
  )
  model = rfc.fit(training_data)

  # Save the new model over the old one
  model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
  model.write().overwrite().save(model_output_path)

  # Evaluate model using test data
  predictions = model.transform(test_data)

  # Evaluate this split's results for each metric
  from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  for metric_name in metric_names:
    evaluator = MulticlassClassificationEvaluator(
      labelCol="ArrDelayBucket",
      predictionCol="Prediction",
      metricName=metric_name
    )
    score = evaluator.evaluate(predictions)
  
    scores[metric_name].append(score)
    print(f"{metric_name} = {score}")

  #
  # Collect feature importances
  #
  feature_names = vector_assembler.getInputCols()
  feature_importance_list = model.featureImportances
  for feature_name, feature_importance in zip(feature_names, feature_importance_list):
    feature_importances[feature_name].append(feature_importance)


Run 1 out of 3 of test/train splits in cross validation...
accuracy = 0.5866520787746171
weightedPrecision = 0.6990370807802881
weightedRecall = 0.586652078774617
f1 = 0.5217345660799317

Run 2 out of 3 of test/train splits in cross validation...
accuracy = 0.5902031063321386
weightedPrecision = 0.6507855345185848
weightedRecall = 0.5902031063321386
f1 = 0.5258478731116644

Run 3 out of 3 of test/train splits in cross validation...
accuracy = 0.5827243974261097
weightedPrecision = 0.6341957653456587
weightedRecall = 0.5827243974261097
f1 = 0.5207586903655927

In [30]:
#
# Evaluate average and STD of each metric and print a table
#
import numpy as np
score_averages = defaultdict(float)

# Compute the table data
average_stds = [] # ha
for metric_name in metric_names:
  metric_scores = scores[metric_name]
  
  average_accuracy = sum(metric_scores) / len(metric_scores)
  score_averages[metric_name] = average_accuracy
  
  std_accuracy = np.std(metric_scores)
  
  average_stds.append((metric_name, average_accuracy, std_accuracy))

# Print the table
print("\nExperiment Log")
print("--------------")
print(tabulate(average_stds, headers=["Metric", "Average", "STD"]))


Experiment Log
--------------
Metric               Average         STD
-----------------  ---------  ----------
accuracy            0.586527  0.00305446
weightedPrecision   0.661339  0.0275032
weightedRecall      0.586527  0.00305446
f1                  0.52278   0.00220533

In [31]:
#
# Persist the score to a sccore log that exists between runs
#
import pickle

# Load the score log or initialize an empty one
try:
  score_log_filename = "../models/score_log.pickle"
  score_log = pickle.load(open(score_log_filename, "rb"))
  if not isinstance(score_log, list):
    score_log = []
except IOError:
  score_log = []

# Compute the existing score log entry
score_log_entry = {metric_name: score_averages[metric_name] for metric_name in metric_names}

# Compute and display the change in score for each metric
try:
  last_log = score_log[-1]
except (IndexError, TypeError, AttributeError):
  last_log = score_log_entry

experiment_report = []
for metric_name in metric_names:
  run_delta = score_log_entry[metric_name] - last_log[metric_name]
  experiment_report.append((metric_name, run_delta))

print("\nExperiment Report")
print("-----------------")
print(tabulate(experiment_report, headers=["Metric", "Score"]))

# Append the existing average scores to the log
score_log.append(score_log_entry)

# Persist the log for next run
pickle.dump(score_log, open(score_log_filename, "wb"))


Experiment Report
-----------------
Metric                  Score
-----------------  ----------
accuracy           0.00362933
weightedPrecision  0.00143662
weightedRecall     0.00362933
f1                 0.00346795

In [32]:
#
# Analyze and report feature importance changes
#

# Compute averages for each feature
feature_importance_entry = defaultdict(float)
for feature_name, value_list in feature_importances.items():
  average_importance = sum(value_list) / len(value_list)
  feature_importance_entry[feature_name] = average_importance

# Sort the feature importances in descending order and print
import operator
sorted_feature_importances = sorted(
  feature_importance_entry.items(),
  key=operator.itemgetter(1),
  reverse=True
)

print("\nFeature Importances")
print("-------------------")
print(tabulate(sorted_feature_importances, headers=['Name', 'Importance']))


Feature Importances
-------------------
Name                Importance
----------------  ------------
DepDelay            0.863795
DayOfMonth_index    0.0270982
DayOfYear_index     0.0246649
Route_index         0.0177811
Origin_index        0.0173232
Distance            0.0122518
Dest_index          0.00733773
Carrier_index       0.00651014
CRSDepHourOfDay     0.00259898
CRSArrHourOfDay     0.00176026
DayOfWeek_index     0.00155594

In [33]:
#
# Compare this run's feature importances with the previous run's
#
  
# Load the feature importance log or initialize an empty one
try:
  feature_log_filename = "../models/feature_log.pickle"
  feature_log = pickle.load(open(feature_log_filename, "rb"))
  if not isinstance(feature_log, list):
    feature_log = []
except IOError:
  feature_log = []

# Compute and display the change in score for each feature
try:
  last_feature_log = feature_log[-1]
except (IndexError, TypeError, AttributeError):
  last_feature_log = defaultdict(float)
  for feature_name, importance in feature_importance_entry.items():
    last_feature_log[feature_name] = importance

# Compute the deltas
feature_deltas = {}
for feature_name in feature_importances.keys():
  run_delta = feature_importance_entry[feature_name] - last_feature_log[feature_name]
  feature_deltas[feature_name] = run_delta

# Sort feature deltas, biggest change first
import operator
sorted_feature_deltas = sorted(
  feature_deltas.items(),
  key=operator.itemgetter(1),
  reverse=True
)

# Display sorted feature deltas
print("\nFeature Importance Delta Report")
print("-------------------------------")
print(tabulate(sorted_feature_deltas, headers=["Feature", "Delta"]))

# Append the existing average deltas to the log
feature_log.append(feature_importance_entry)

# Persist the log for next run
pickle.dump(feature_log, open(feature_log_filename, "wb"))


Feature Importance Delta Report
-------------------------------
Feature                  Delta
----------------  ------------
DayOfMonth_index   0.00842893
Distance           0.00417504
CRSDepHourOfDay    0.00259898
CRSArrHourOfDay    0.00176026
Carrier_index      0.00135018
Dest_index        -0.000662988
DayOfWeek_index   -0.000915246
Origin_index      -0.00118843
Route_index       -0.0016055
DayOfYear_index   -0.00556882
DepDelay          -0.00718399

Interpreting Our Results

Interpreting the output, it looks like the combined effect of these fields is to impact feature importance by about 1%, but the effect on accuracy is insignificant. We’ll leave the fields in, although they don’t help much. Without resorting to advanced time series analysis, it seems we’ve milked all we can from date/time-based features.

Incorporating Airplane Data

Recall from Investigating Airplanes (Entities) that we incorporated data on airplane manufacturers into our data model. For instance, we analyzed the distribution of manufacturers in the American commercial fleet. In this section, we’re going to join in airline data and see what impact this has on the model’s accuracy.

I wonder whether properties of the aircraft (called the “metal” of the flight) influence delays? For instance, bigger aircraft fly higher and can go over weather, while smaller aircraft may be less able to do so. I can’t honestly think of a reason why the engine manufacturer, airplane manufacturer, or manufacture year would have an impact on the model, but since we’re importing one field, we may as well try them all! Note that we can simply drop any features that don’t rank as very significant. The beauty of our experimental model with decision trees is that it doesn’t cost extra to try extra fields. Sometimes you can simply let the model decide what matters.

Note that when dealing with team members and with other teams who need an accounting of your time in order to coordinate with you, a description of the experiments you are running will help keep the teams in sync. For instance, “We are attempting to incorporate a new dataset which we scraped from the FAA website into our flight delay predictive model” would make a good experimental description during an agile sprint.

Extracting Airplane Features

To add airplane features to our model, we need to create a new feature extraction script, ch09/extract_features_with_airplanes.py. We can do this by copying and altering ch09/extract_features.py.

First we add TailNum to the fields we select from our training data. Because this column also appears in our airplane dataset, we need to name it differently or we won’t easily be able to access the column after the join. We’ll name it FeatureTailNum:


In [34]:
# Load the on-time parquet file
input_path = "../data/january_performance.parquet"
on_time_dataframe = spark.read.parquet(input_path)
on_time_dataframe.registerTempTable("on_time_performance")

# Select a few features of interest
simple_on_time_features = spark.sql("""
SELECT
  FlightNum,
  FlightDate,
  DayOfWeek,
  DayofMonth AS DayOfMonth,
  CONCAT(Month, '-',  DayofMonth) AS DayOfYear,
  Carrier,
  Origin,
  Dest,
  Distance,
  DepDelay,
  ArrDelay,
  CRSDepTime,
  CRSArrTime,
  CONCAT(Origin, '-', Dest) AS Route,
  TailNum AS FeatureTailNum
FROM on_time_performance
WHERE FlightDate < '2015-02-01'
""")

simple_on_time_features = simple_on_time_features.sample(False, 0.1)

simple_on_time_features.select(
  "FlightNum",
  "FlightDate",
  "FeatureTailNum"
).show(10)


+---------+----------+--------------+
|FlightNum|FlightDate|FeatureTailNum|
+---------+----------+--------------+
|      937|2015-01-01|        N009AA|
|     1310|2015-01-01|        N011AA|
|     2227|2015-01-01|        N016AA|
|      903|2015-01-01|        N017AA|
|       17|2015-01-01|        N019AA|
|      324|2015-01-01|        N202AA|
|     2413|2015-01-01|        N357AA|
|       28|2015-01-01|        N358AA|
|     1621|2015-01-01|        N367AA|
|     1210|2015-01-01|        N397AA|
+---------+----------+--------------+
only showing top 10 rows


In [35]:
# Filter nulls, they can't help us
filled_on_time_features = simple_on_time_features.where(simple_on_time_features.ArrDelay.isNotNull())
filled_on_time_features = filled_on_time_features.where(filled_on_time_features.DepDelay.isNotNull())
filled_on_time_features.show(5)


+---------+----------+---------+----------+---------+-------+------+----+--------+--------+--------+----------+----------+-------+--------------+
|FlightNum|FlightDate|DayOfWeek|DayOfMonth|DayOfYear|Carrier|Origin|Dest|Distance|DepDelay|ArrDelay|CRSDepTime|CRSArrTime|  Route|FeatureTailNum|
+---------+----------+---------+----------+---------+-------+------+----+--------+--------+--------+----------+----------+-------+--------------+
|      937|2015-01-01|        4|         1|      1-1|     AA|   EGE| LAX|   748.0|    10.0|   -12.0|      1805|      1920|EGE-LAX|        N009AA|
|     1310|2015-01-01|        4|         1|      1-1|     AA|   DFW| CLE|  1021.0|    47.0|    42.0|      0855|      1225|DFW-CLE|        N011AA|
|     2227|2015-01-01|        4|         1|      1-1|     AA|   MIA| IAH|   964.0|    42.0|    49.0|      1640|      1842|MIA-IAH|        N016AA|
|      903|2015-01-01|        4|         1|      1-1|     AA|   LAX| EGE|   748.0|    -7.0|    16.0|      0830|      1145|LAX-EGE|        N017AA|
|       17|2015-01-01|        4|         1|      1-1|     AA|   ATL| MIA|   594.0|    -4.0|   -10.0|      0700|      0852|ATL-MIA|        N019AA|
+---------+----------+---------+----------+---------+-------+------+----+--------+--------+--------+----------+----------+-------+--------------+
only showing top 5 rows


In [36]:
# We need to turn timestamps into timestamps, and not strings or numbers
def convert_hours(hours_minutes):
  hours = hours_minutes[:-2]
  minutes = hours_minutes[-2:]
  
  if hours == '24':
    hours = '23'
    minutes = '59'
  
  time_string = "{}:{}:00Z".format(hours, minutes)
  return time_string

def compose_datetime(iso_date, time_string):
  return "{} {}".format(iso_date, time_string)

def create_iso_string(iso_date, hours_minutes):
  time_string = convert_hours(hours_minutes)
  full_datetime = compose_datetime(iso_date, time_string)
  return full_datetime

def create_datetime(iso_string):
  return iso8601.parse_date(iso_string)

def convert_datetime(iso_date, hours_minutes):
  iso_string = create_iso_string(iso_date, hours_minutes)
  dt = create_datetime(iso_string)
  return dt

def day_of_year(iso_date_string):
  dt = iso8601.parse_date(iso_date_string)
  doy = dt.timetuple().tm_yday
  return doy

def alter_feature_datetimes(row):
  
  flight_date = iso8601.parse_date(row['FlightDate'])
  scheduled_dep_time = convert_datetime(row['FlightDate'], row['CRSDepTime'])
  scheduled_arr_time = convert_datetime(row['FlightDate'], row['CRSArrTime'])
  
  # Handle overnight flights
  if scheduled_arr_time < scheduled_dep_time:
    scheduled_arr_time += datetime.timedelta(days=1)
  
  doy = day_of_year(row['FlightDate'])
  
  return {
    'FlightNum': row['FlightNum'],
    'FlightDate': flight_date,
    'DayOfWeek': int(row['DayOfWeek']),
    'DayOfMonth': int(row['DayOfMonth']),
    'DayOfYear': doy,
    'Carrier': row['Carrier'],
    'Origin': row['Origin'],
    'Dest': row['Dest'],
    'Distance': row['Distance'],
    'DepDelay': row['DepDelay'],
    'ArrDelay': row['ArrDelay'],
    'CRSDepTime': scheduled_dep_time,
    'CRSArrTime': scheduled_arr_time,
    'Route': row['Route'],
    'FeatureTailNum': row['FeatureTailNum'],
  }

In [37]:
timestamp_features = filled_on_time_features.rdd.map(alter_feature_datetimes)
timestamp_df = timestamp_features.toDF()
timestamp_df.show(5)


/home/vagrant/spark/python/pyspark/sql/session.py:366: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+--------------+-------------------+---------+------+-------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FeatureTailNum|         FlightDate|FlightNum|Origin|  Route|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+--------------+-------------------+---------+------+-------+
|   -12.0|2015-01-01 19:20:00|2015-01-01 18:05:00|     AA|         1|        4|        1|    10.0| LAX|   748.0|        N009AA|2015-01-01 00:00:00|      937|   EGE|EGE-LAX|
|    42.0|2015-01-01 12:25:00|2015-01-01 08:55:00|     AA|         1|        4|        1|    47.0| CLE|  1021.0|        N011AA|2015-01-01 00:00:00|     1310|   DFW|DFW-CLE|
|    49.0|2015-01-01 18:42:00|2015-01-01 16:40:00|     AA|         1|        4|        1|    42.0| IAH|   964.0|        N016AA|2015-01-01 00:00:00|     2227|   MIA|MIA-IAH|
|    16.0|2015-01-01 11:45:00|2015-01-01 08:30:00|     AA|         1|        4|        1|    -7.0| EGE|   748.0|        N017AA|2015-01-01 00:00:00|      903|   LAX|LAX-EGE|
|   -10.0|2015-01-01 08:52:00|2015-01-01 07:00:00|     AA|         1|        4|        1|    -4.0| MIA|   594.0|        N019AA|2015-01-01 00:00:00|       17|   ATL|ATL-MIA|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+--------------+-------------------+---------+------+-------+
only showing top 5 rows

Joining Airplane Data

Next, we load the airplane data and left join it to our features dataset. Note that null is a problematic value for our StringIndexer. But we don’t want to discard empty values or rows either, because whether a variable is present or not is something our decision tree model can use to learn. We use DataFrame.selectExpr to COALESCE our null values to the string 'Empty'. This will get its own index from StringIndexer and things will work out well. Also note that we rename FeatureTailNum back to TailNum for the final output:


In [38]:
# Load airplanes and left join on tail numbers
airplanes_path = "../data/airplanes.jsonl"
airplanes = spark.read.json(airplanes_path)

# Left outer join ensures all feature records remain, with nulls where airplane records are not available
features_with_airplanes = timestamp_df.join(
  airplanes,
  on=timestamp_df.FeatureTailNum == airplanes.TailNum,
  how="left_outer"
)

# Fill in the nulls 'Empty' with COALESCE
features_with_airplanes = features_with_airplanes.selectExpr(
  "FlightNum",
  "FlightDate",
  "DayOfWeek",
  "DayOfMonth",
  "DayOfYear",
  "Carrier",
  "Origin",
  "Dest",
  "Distance",
  "DepDelay",
  "ArrDelay",
  "CRSDepTime",
  "CRSArrTime",
  "Route",
  "FeatureTailNum AS TailNum",
  "COALESCE(EngineManufacturer, 'Empty') AS EngineManufacturer",
  "COALESCE(EngineModel, 'Empty') AS EngineModel",
  "COALESCE(Manufacturer, 'Empty') AS Manufacturer",
  "COALESCE(ManufacturerYear, 'Empty') AS ManufacturerYear",
  "COALESCE(Model, 'Empty') AS Model",
  "COALESCE(OwnerState, 'Empty') AS OwnerState"
)
features_with_airplanes.show(5)


+---------+-------------------+---------+----------+---------+-------+------+----+--------+--------+--------+-------------------+-------------------+-------+-------+------------------+-----------+------------+----------------+-----+----------+
|FlightNum|         FlightDate|DayOfWeek|DayOfMonth|DayOfYear|Carrier|Origin|Dest|Distance|DepDelay|ArrDelay|         CRSDepTime|         CRSArrTime|  Route|TailNum|EngineManufacturer|EngineModel|Manufacturer|ManufacturerYear|Model|OwnerState|
+---------+-------------------+---------+----------+---------+-------+------+----+--------+--------+--------+-------------------+-------------------+-------+-------+------------------+-----------+------------+----------------+-----+----------+
|      937|2015-01-01 00:00:00|        4|         1|        1|     AA|   EGE| LAX|   748.0|    10.0|   -12.0|2015-01-01 18:05:00|2015-01-01 19:20:00|EGE-LAX| N009AA|             Empty|      Empty|       Empty|           Empty|Empty|     Empty|
|     1310|2015-01-01 00:00:00|        4|         1|        1|     AA|   DFW| CLE|  1021.0|    47.0|    42.0|2015-01-01 08:55:00|2015-01-01 12:25:00|DFW-CLE| N011AA|             Empty|      Empty|       Empty|           Empty|Empty|     Empty|
|     2227|2015-01-01 00:00:00|        4|         1|        1|     AA|   MIA| IAH|   964.0|    42.0|    49.0|2015-01-01 16:40:00|2015-01-01 18:42:00|MIA-IAH| N016AA|             Empty|      Empty|       Empty|           Empty|Empty|     Empty|
|      903|2015-01-01 00:00:00|        4|         1|        1|     AA|   LAX| EGE|   748.0|    -7.0|    16.0|2015-01-01 08:30:00|2015-01-01 11:45:00|LAX-EGE| N017AA|             Empty|      Empty|       Empty|           Empty|Empty|     Empty|
|       17|2015-01-01 00:00:00|        4|         1|        1|     AA|   ATL| MIA|   594.0|    -4.0|   -10.0|2015-01-01 07:00:00|2015-01-01 08:52:00|ATL-MIA| N019AA|             Empty|      Empty|       Empty|           Empty|Empty|     Empty|
+---------+-------------------+---------+----------+---------+-------+------+----+--------+--------+--------+-------------------+-------------------+-------+-------+------------------+-----------+------------+----------------+-----+----------+
only showing top 5 rows


In [39]:
# Explicitly sort the data and keep it sorted throughout. Leave nothing to chance.
sorted_features = features_with_airplanes.sort(
  timestamp_df.DayOfYear,
  timestamp_df.Carrier,
  timestamp_df.Origin,
  timestamp_df.Dest,
  timestamp_df.FlightNum,
  timestamp_df.CRSDepTime,
  timestamp_df.CRSArrTime,
)

Storing Our Features

Finally, we store the final output to a new path. We’ll have to remember to alter our model training script to point at this new path:


In [40]:
# Store as a single json file
output_path = "../data/simple_flight_delay_features_airplanes.json"
sorted_features.repartition(1).write.mode("overwrite").json(output_path)

Now we’re ready to incorporate the features into our model.

Incorporating Airplane Features into Our Classifier Model

Now we need to create a new script that incorporates our new airplane features into our classifier model. Check out ch09/spark_model_with_airplanes.py, which we copied from ch09/improved_spark_mllib_model.py and altered.

First we need to load the training data with the additional fields, including Route (which is now calculated in ch09/extract_features_with_airplanes.py):


In [41]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf

schema = StructType([
  StructField("ArrDelay", DoubleType(), True),
  StructField("CRSArrTime", TimestampType(), True),
  StructField("CRSDepTime", TimestampType(), True),
  StructField("Carrier", StringType(), True),
  StructField("DayOfMonth", IntegerType(), True),
  StructField("DayOfWeek", IntegerType(), True),
  StructField("DayOfYear", IntegerType(), True),
  StructField("DepDelay", DoubleType(), True),
  StructField("Dest", StringType(), True),
  StructField("Distance", DoubleType(), True),
  StructField("FlightDate", DateType(), True),
  StructField("FlightNum", StringType(), True),
  StructField("Origin", StringType(), True),
  StructField("Route", StringType(), True),
  StructField("TailNum", StringType(), True),
  StructField("EngineManufacturer", StringType(), True),
  StructField("EngineModel", StringType(), True),
  StructField("Manufacturer", StringType(), True),
  StructField("ManufacturerYear", StringType(), True),
  StructField("OwnerState", StringType(), True),
])

input_path = "../data/simple_flight_delay_features_airplanes.json"
features = spark.read.json(input_path, schema=schema)
features.show(5)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|TailNum|EngineManufacturer|EngineModel|     Manufacturer|ManufacturerYear|OwnerState|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+
|   -10.0|2015-01-01 08:52:00|2015-01-01 07:00:00|     AA|         1|        4|        1|    -4.0| MIA|   594.0|2015-01-01|       17|   ATL|ATL-MIA| N019AA|             Empty|      Empty|            Empty|           Empty|     Empty|
|    -3.0|2015-01-01 23:02:00|2015-01-01 21:10:00|     AA|         1|        4|        1|    -7.0| MIA|   594.0|2015-01-01|      349|   ATL|ATL-MIA| N3KVAA|             Empty|      Empty|            Empty|           Empty|     Empty|
|    16.0|2015-01-01 15:30:00|2015-01-01 14:25:00|     AA|         1|        4|        1|    24.0| DFW|   190.0|2015-01-01|     2297|   AUS|AUS-DFW| N483AA|             Empty|      Empty|            Empty|           Empty|     Empty|
|    25.0|2015-01-01 21:10:00|2015-01-01 19:45:00|     AA|         1|        4|        1|    28.0| LAX|  1242.0|2015-01-01|     1167|   AUS|AUS-LAX| N4XVAA|             Empty|      Empty|            Empty|           Empty|     Empty|
|    -6.0|2015-01-01 16:55:00|2015-01-01 14:20:00|     AA|         1|        4|        1|    13.0| ORD|   978.0|2015-01-01|     1201|   AUS|AUS-ORD| N566AA|             P & W|JT8D SERIES|MCDONNELL DOUGLAS|            1987|     TEXAS|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+
only showing top 5 rows


In [42]:
#
# Add the hour of day of scheduled arrival/departure
#
from pyspark.sql.functions import hour
features_with_hour = features.withColumn(
  "CRSDepHourOfDay",
  hour(features.CRSDepTime)
)
features_with_hour = features_with_hour.withColumn(
  "CRSArrHourOfDay",
  hour(features.CRSArrTime)
)
features_with_hour.select("CRSDepTime", "CRSDepHourOfDay", "CRSArrTime", "CRSArrHourOfDay").show(5)


+-------------------+---------------+-------------------+---------------+
|         CRSDepTime|CRSDepHourOfDay|         CRSArrTime|CRSArrHourOfDay|
+-------------------+---------------+-------------------+---------------+
|2015-01-01 07:00:00|              7|2015-01-01 08:52:00|              8|
|2015-01-01 21:10:00|             21|2015-01-01 23:02:00|             23|
|2015-01-01 14:25:00|             14|2015-01-01 15:30:00|             15|
|2015-01-01 19:45:00|             19|2015-01-01 21:10:00|             21|
|2015-01-01 14:20:00|             14|2015-01-01 16:55:00|             16|
+-------------------+---------------+-------------------+---------------+
only showing top 5 rows

Because we left joined our new features in, we need to know how many of the resulting training records have null values for their fields. Null values will crash the StringIndexer for a field, so we’ve explicitly altered our feature extraction code to remove them. There should be no nulls, so we’ll print a table with a warning if they are present:


In [43]:
#
# Check for nulls in features before using Spark ML
#
null_counts = [(column, features_with_hour.where(features_with_hour[column].isNull()).count()) for column in features_with_hour.columns]
cols_with_nulls = filter(lambda x: x[1] > 0, null_counts)
print("\nNull Value Report")
print("-----------------")
print(tabulate(cols_with_nulls, headers=["Column", "Nulls"]))


Null Value Report
-----------------
Column    Nulls
--------  -------

There should be no nulls present!

Next we need to bucketize our data as per normal.


In [44]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay into on-time, slightly late, very late (0, 1, 2)
#
from pyspark.ml.feature import Bucketizer

# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
  splits=splits,
  inputCol="ArrDelay",
  outputCol="ArrDelayBucket"
)

# Save the model
arrival_bucketizer_path = "../models/arrival_bucketizer_2.0.bin"
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)

# Apply the model
ml_bucketized_features = arrival_bucketizer.transform(features_with_hour)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(5)


+--------+--------------+
|ArrDelay|ArrDelayBucket|
+--------+--------------+
|   -10.0|           1.0|
|    -3.0|           1.0|
|    16.0|           2.0|
|    25.0|           2.0|
|    -6.0|           1.0|
+--------+--------------+
only showing top 5 rows

Next we add the hour of day fields as normal, and we bucketize the ArrDelay field to get the ArrDelayBucket. Then we need to index all our string columns, including our new airplane features.

Note that we are also making another change. We are moving the DayOfMonth, DayOfWeek and DayOfYear fields, along with CRSDepHourOfDay and CRSArrHourOfDay into numeric fields directly to be vectorized. This is because in thinking about it... indexing these numeric fields only adds a layer of abstraction, encoding them into numbers again.


In [45]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Turn category fields into indexes
string_columns = ["Carrier", "Origin", "Dest", "Route",
                  "TailNum", "EngineManufacturer", "EngineModel",
                  "Manufacturer", "ManufacturerYear", "OwnerState"]
for column in string_columns:
  string_indexer = StringIndexer(
    inputCol=column,
    outputCol=column + "_index"
  )
  
  string_indexer_model = string_indexer.fit(ml_bucketized_features)
  ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)
  
  # Save the pipeline model
  string_indexer_output_path = "../models/string_indexer_model_4.0.{}.bin".format(
    column
  )
  string_indexer_model.write().overwrite().save(string_indexer_output_path)

ml_bucketized_features.show(5)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+---------------+---------------+--------------+-------------+------------+----------+-----------+-------------+------------------------+-----------------+------------------+----------------------+----------------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|TailNum|EngineManufacturer|EngineModel|     Manufacturer|ManufacturerYear|OwnerState|CRSDepHourOfDay|CRSArrHourOfDay|ArrDelayBucket|Carrier_index|Origin_index|Dest_index|Route_index|TailNum_index|EngineManufacturer_index|EngineModel_index|Manufacturer_index|ManufacturerYear_index|OwnerState_index|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+---------------+---------------+--------------+-------------+------------+----------+-----------+-------------+------------------------+-----------------+------------------+----------------------+----------------+
|   -10.0|2015-01-01 08:52:00|2015-01-01 07:00:00|     AA|         1|        4|        1|    -4.0| MIA|   594.0|2015-01-01|       17|   ATL|ATL-MIA| N019AA|             Empty|      Empty|            Empty|           Empty|     Empty|              7|              8|           1.0|          4.0|         0.0|      22.0|      112.0|       1333.0|                     1.0|              1.0|               1.0|                   0.0|             2.0|
|    -3.0|2015-01-01 23:02:00|2015-01-01 21:10:00|     AA|         1|        4|        1|    -7.0| MIA|   594.0|2015-01-01|      349|   ATL|ATL-MIA| N3KVAA|             Empty|      Empty|            Empty|           Empty|     Empty|             21|             23|           1.0|          4.0|         0.0|      22.0|      112.0|       3032.0|                     1.0|              1.0|               1.0|                   0.0|             2.0|
|    16.0|2015-01-01 15:30:00|2015-01-01 14:25:00|     AA|         1|        4|        1|    24.0| DFW|   190.0|2015-01-01|     2297|   AUS|AUS-DFW| N483AA|             Empty|      Empty|            Empty|           Empty|     Empty|             14|             15|           2.0|          4.0|        38.0|       1.0|      246.0|       1273.0|                     1.0|              1.0|               1.0|                   0.0|             2.0|
|    25.0|2015-01-01 21:10:00|2015-01-01 19:45:00|     AA|         1|        4|        1|    28.0| LAX|  1242.0|2015-01-01|     1167|   AUS|AUS-LAX| N4XVAA|             Empty|      Empty|            Empty|           Empty|     Empty|             19|             21|           2.0|          4.0|        38.0|       4.0|      559.0|       2431.0|                     1.0|              1.0|               1.0|                   0.0|             2.0|
|    -6.0|2015-01-01 16:55:00|2015-01-01 14:20:00|     AA|         1|        4|        1|    13.0| ORD|   978.0|2015-01-01|     1201|   AUS|AUS-ORD| N566AA|             P & W|JT8D SERIES|MCDONNELL DOUGLAS|            1987|     TEXAS|             14|             16|           1.0|          4.0|        38.0|       2.0|     1292.0|       1358.0|                     5.0|              4.0|               6.0|                  29.0|             1.0|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+---------------+---------------+--------------+-------------+------------+----------+-----------+-------------+------------------------+-----------------+------------------+----------------------+----------------+
only showing top 5 rows

Next, we need to create a new VectorAssembler to combine our features into one feature vector, the column Features_vec. As before, an index field name is the field name with _index appended. This time around, we use a list comprehension to compute the index columns:


In [46]:
# Combine continuous, numeric fields with indexes of nominal ones
# ...into one feature vector
numeric_columns = [
  "DepDelay",
  "Distance",
  "DayOfYear",
  "DayOfMonth",
  "DayOfWeek",
  "CRSDepHourOfDay",
  "CRSArrHourOfDay"
]
index_columns = [column + "_index" for column in string_columns]
index_columns


Out[46]:
['Carrier_index',
 'Origin_index',
 'Dest_index',
 'Route_index',
 'TailNum_index',
 'EngineManufacturer_index',
 'EngineModel_index',
 'Manufacturer_index',
 'ManufacturerYear_index',
 'OwnerState_index']

In [47]:
vector_assembler = VectorAssembler(
  inputCols=numeric_columns + index_columns,
  outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)

# Save the numeric vector assembler
vector_assembler_path = "../models/numeric_vector_assembler_5.0.bin"
vector_assembler.write().overwrite().save(vector_assembler_path)

# Drop the index columns
for column in index_columns:
  final_vectorized_features = final_vectorized_features.drop(column)

# Inspect the finalized features
final_vectorized_features.show(5)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+---------------+---------------+--------------+--------------------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|TailNum|EngineManufacturer|EngineModel|     Manufacturer|ManufacturerYear|OwnerState|CRSDepHourOfDay|CRSArrHourOfDay|ArrDelayBucket|        Features_vec|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+---------------+---------------+--------------+--------------------+
|   -10.0|2015-01-01 08:52:00|2015-01-01 07:00:00|     AA|         1|        4|        1|    -4.0| MIA|   594.0|2015-01-01|       17|   ATL|ATL-MIA| N019AA|             Empty|      Empty|            Empty|           Empty|     Empty|              7|              8|           1.0|[-4.0,594.0,1.0,1...|
|    -3.0|2015-01-01 23:02:00|2015-01-01 21:10:00|     AA|         1|        4|        1|    -7.0| MIA|   594.0|2015-01-01|      349|   ATL|ATL-MIA| N3KVAA|             Empty|      Empty|            Empty|           Empty|     Empty|             21|             23|           1.0|[-7.0,594.0,1.0,1...|
|    16.0|2015-01-01 15:30:00|2015-01-01 14:25:00|     AA|         1|        4|        1|    24.0| DFW|   190.0|2015-01-01|     2297|   AUS|AUS-DFW| N483AA|             Empty|      Empty|            Empty|           Empty|     Empty|             14|             15|           2.0|[24.0,190.0,1.0,1...|
|    25.0|2015-01-01 21:10:00|2015-01-01 19:45:00|     AA|         1|        4|        1|    28.0| LAX|  1242.0|2015-01-01|     1167|   AUS|AUS-LAX| N4XVAA|             Empty|      Empty|            Empty|           Empty|     Empty|             19|             21|           2.0|[28.0,1242.0,1.0,...|
|    -6.0|2015-01-01 16:55:00|2015-01-01 14:20:00|     AA|         1|        4|        1|    13.0| ORD|   978.0|2015-01-01|     1201|   AUS|AUS-ORD| N566AA|             P & W|JT8D SERIES|MCDONNELL DOUGLAS|            1987|     TEXAS|             14|             16|           1.0|[13.0,978.0,1.0,1...|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+---------------+---------------+--------------+--------------------+
only showing top 5 rows

The rest of the code is identical to ch09/improved_spark_mllib_model.py:


In [48]:
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#

from collections import defaultdict
scores = defaultdict(list)
feature_importances = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3

for i in range(1, split_count + 1):
  print(f"\nRun {i} out of {split_count} of test/train splits in cross validation...")
  
  # Test/train split
  training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])
  
  # Instantiate and fit random forest classifier on all the data
  from pyspark.ml.classification import RandomForestClassifier
  rfc = RandomForestClassifier(
    featuresCol="Features_vec",
    labelCol="ArrDelayBucket",
    predictionCol="Prediction",
    maxBins=4896,
    maxMemoryInMB=1024
  )
  model = rfc.fit(training_data)
  
  # Save the new model over the old one
  model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
  model.write().overwrite().save(model_output_path)
  
  # Evaluate model using test data
  predictions = model.transform(test_data)
  
  # Evaluate this split's results for each metric
  from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  for metric_name in metric_names:
    evaluator = MulticlassClassificationEvaluator(
      labelCol="ArrDelayBucket",
      predictionCol="Prediction",
      metricName=metric_name
    )
    score = evaluator.evaluate(predictions)
    
    scores[metric_name].append(score)
    print("{} = {}".format(metric_name, score))
  
  #
  # Collect feature importances
  #
  feature_names = vector_assembler.getInputCols()
  feature_importance_list = model.featureImportances
  for feature_name, feature_importance in zip(feature_names, feature_importance_list):
    feature_importances[feature_name].append(feature_importance)


Run 1 out of 3 of test/train splits in cross validation...
accuracy = 0.5799608355091384
weightedPrecision = 0.5128851827971685
weightedRecall = 0.5799608355091384
f1 = 0.5129849127510245

Run 2 out of 3 of test/train splits in cross validation...
accuracy = 0.5870511425462459
weightedPrecision = 0.5216138118522686
weightedRecall = 0.5870511425462459
f1 = 0.520345356453493

Run 3 out of 3 of test/train splits in cross validation...
accuracy = 0.5797181237612861
weightedPrecision = 0.512201626392988
weightedRecall = 0.5797181237612861
f1 = 0.5142372027931253

In [49]:
#
# Evaluate average and STD of each metric and print a table
#
import numpy as np
score_averages = defaultdict(float)

# Compute the table data
average_stds = []  # ha
for metric_name in metric_names:
  metric_scores = scores[metric_name]
  
  average_accuracy = sum(metric_scores) / len(metric_scores)
  score_averages[metric_name] = average_accuracy
  
  std_accuracy = np.std(metric_scores)
  
  average_stds.append((metric_name, average_accuracy, std_accuracy))

# Print the table
print("\nExperiment Log")
print("--------------")
print(tabulate(average_stds, headers=["Metric", "Average", "STD"]))


Experiment Log
--------------
Metric               Average         STD
-----------------  ---------  ----------
accuracy            0.582243  0.00340105
weightedPrecision   0.515567  0.00428493
weightedRecall      0.582243  0.00340105
f1                  0.515856  0.00321548

In [50]:
#
# Persist the score to a sccore log that exists between runs
#
import pickle

# Load the score log or initialize an empty one
try:
  score_log_filename = "../models/score_log.pickle"
  score_log = pickle.load(open(score_log_filename, "rb"))
  if not isinstance(score_log, list):
    score_log = []
except IOError:
  score_log = []

# Compute the existing score log entry
score_log_entry = {
  metric_name: score_averages[metric_name] for metric_name in metric_names
}

# Compute and display the change in score for each metric
try:
  last_log = score_log[-1]
except (IndexError, TypeError, AttributeError):
  last_log = score_log_entry

experiment_report = []
for metric_name in metric_names:
  run_delta = score_log_entry[metric_name] - last_log[metric_name]
  experiment_report.append((metric_name, run_delta))

print("\nExperiment Report")
print("-----------------")
print(tabulate(experiment_report, headers=["Metric", "Score"]))

# Append the existing average scores to the log
score_log.append(score_log_entry)

# Persist the log for next run
pickle.dump(score_log, open(score_log_filename, "wb"))


Experiment Report
-----------------
Metric                   Score
-----------------  -----------
accuracy           -0.00428316
weightedPrecision  -0.145773
weightedRecall     -0.00428316
f1                 -0.00692455

In [51]:
#
# Analyze and report feature importance changes
#

# Compute averages for each feature
feature_importance_entry = defaultdict(float)
for feature_name, value_list in feature_importances.items():
  average_importance = sum(value_list) / len(value_list)
  feature_importance_entry[feature_name] = average_importance

# Sort the feature importances in descending order and print
import operator
sorted_feature_importances = sorted(
  feature_importance_entry.items(),
  key=operator.itemgetter(1),
  reverse=True
)

print("\nFeature Importances")
print("-------------------")
print(tabulate(sorted_feature_importances, headers=['Name', 'Importance']))


Feature Importances
-------------------
Name                        Importance
------------------------  ------------
DepDelay                    0.818825
DayOfYear                   0.0389761
DayOfMonth                  0.0287786
Origin_index                0.0245126
TailNum_index               0.0232409
Route_index                 0.0183321
Distance                    0.0124914
Carrier_index               0.00840604
Dest_index                  0.00840502
CRSDepHourOfDay             0.00524377
CRSArrHourOfDay             0.00495677
EngineModel_index           0.00227696
Manufacturer_index          0.00150245
ManufacturerYear_index      0.00136106
OwnerState_index            0.00127585
EngineManufacturer_index    0.00111694
DayOfWeek                   0.00029805

In [52]:
#
# Compare this run's feature importances with the previous run's
#

# Load the feature importance log or initialize an empty one
try:
  feature_log_filename = "../models/feature_log.pickle"
  feature_log = pickle.load(open(feature_log_filename, "rb"))
  if not isinstance(feature_log, list):
    feature_log = []
except IOError:
  feature_log = []

# Compute and display the change in score for each feature
try:
  last_feature_log = feature_log[-1]
except (IndexError, TypeError, AttributeError):
  last_feature_log = defaultdict(float)
  for feature_name, importance in feature_importance_entry.items():
    last_feature_log[feature_name] = importance

# Compute the deltas
feature_deltas = {}
for feature_name in feature_importances.keys():
  run_delta = feature_importance_entry[feature_name] - last_feature_log[feature_name]
  feature_deltas[feature_name] = run_delta

# Sort feature deltas, biggest change first
import operator
sorted_feature_deltas = sorted(
  feature_deltas.items(),
  key=operator.itemgetter(1),
  reverse=True
)

# Display sorted feature deltas
print("\nFeature Importance Delta Report")
print("-------------------------------")
print(tabulate(sorted_feature_deltas, headers=["Feature", "Delta"]))

# Append the existing average deltas to the log
feature_log.append(feature_importance_entry)

# Persist the log for next run
pickle.dump(feature_log, open(feature_log_filename, "wb"))


Feature Importance Delta Report
-------------------------------
Feature                          Delta
------------------------  ------------
DayOfYear                  0.0389761
DayOfMonth                 0.0287786
TailNum_index              0.0232409
Origin_index               0.00718938
CRSArrHourOfDay            0.00319651
CRSDepHourOfDay            0.00264478
EngineModel_index          0.00227696
Carrier_index              0.0018959
Manufacturer_index         0.00150245
ManufacturerYear_index     0.00136106
OwnerState_index           0.00127585
EngineManufacturer_index   0.00111694
Dest_index                 0.00106729
Route_index                0.000551038
DayOfWeek                  0.00029805
Distance                   0.000239563
DepDelay                  -0.0449692

Note that on the first go around, our model failed because we needed to increase the maxBins parameter to 4896 to accommodate our new fields. After that, the code runs without incident.

It looks like our efforts were mostly for naught—they actually hurt the quality of the model (although so little that it comes out about even)! The single exception is that adding the TailNum helps in terms of feature importance by 0.05 (your precise results may vary). Apparently some airplanes are more prone to delay than others, but this isn’t down to the properties of the airplane much... or at least those properties of the airplane are encoded mostly by the identity of the airplane itself.

Keep It Simple Stupid: KISS Your Models

The tail end of the scores for feature importance look like this (for me):

EngineModel_index          0.00221025
OwnerState_index           0.00181267
ManufacturerYear_index     0.00156983
Manufacturer_index         0.000969526
EngineManufacturer_index   0.000708076
DayOfWeek                  0.00032268

It is better to have a simpler model as they tend not to overfit, that is to work well on training data by extracting patterns that only apply to it by chance but not to work well on test data or in the real world. They are also easier to deploy and maintain. If something doesn't help - remove it!

Model Simplification Experiment

Lets remove EngineModel, OwnerState, ManufacturerYear, Manufacturer, EngineManufacturer and DayOfWeek and run the model again, to see what this does to performance.


In [53]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay into on-time, slightly late, very late (0, 1, 2)
#
from pyspark.ml.feature import Bucketizer

# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
  splits=splits,
  inputCol="ArrDelay",
  outputCol="ArrDelayBucket"
)

# Save the model
arrival_bucketizer_path = "../models/arrival_bucketizer_2.0.bin"
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)

# Apply the model
ml_bucketized_features = arrival_bucketizer.transform(features_with_hour)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(5)


+--------+--------------+
|ArrDelay|ArrDelayBucket|
+--------+--------------+
|   -10.0|           1.0|
|    -3.0|           1.0|
|    16.0|           2.0|
|    25.0|           2.0|
|    -6.0|           1.0|
+--------+--------------+
only showing top 5 rows


In [65]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Turn category fields into indexes
string_columns = ["Carrier", "Origin", "Dest", "Route", "TailNum"]
for column in string_columns:
  string_indexer = StringIndexer(
    inputCol=column,
    outputCol=column + "_index"
  )
  
  string_indexer_model = string_indexer.fit(ml_bucketized_features)
  ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)
  
  # Save the pipeline model
  string_indexer_output_path = "../models/string_indexer_model_4.0.{}.bin".format(
    column
  )
  string_indexer_model.write().overwrite().save(string_indexer_output_path)

ml_bucketized_features.show(5)


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
~/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

~/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o7367.fit.
: java.lang.IllegalArgumentException: requirement failed: Output column Carrier_index already exists.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.feature.StringIndexerBase$class.validateAndTransformSchema(StringIndexer.scala:91)
	at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:109)
	at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:152)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:135)
	at sun.reflect.GeneratedMethodAccessor126.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-65-0155a341776d> in <module>
     12   )
     13 
---> 14   string_indexer_model = string_indexer.fit(ml_bucketized_features)
     15   ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)
     16 

~/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    293 
    294     def _fit(self, dataset):
--> 295         java_model = self._fit_java(dataset)
    296         model = self._create_model(java_model)
    297         return self._copyValues(model)

~/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    290         """
    291         self._transfer_params_to_java()
--> 292         return self._java_obj.fit(dataset._jdf)
    293 
    294     def _fit(self, dataset):

~/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     77                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
     78             if s.startswith('java.lang.IllegalArgumentException: '):
---> 79                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     80             raise
     81     return deco

IllegalArgumentException: 'requirement failed: Output column Carrier_index already exists.'

In [55]:
# Combine continuous, numeric fields with indexes of nominal ones
# ...into one feature vector
numeric_columns = [
  "DepDelay",
  "Distance",
  "DayOfYear",
  "DayOfMonth",
  "CRSDepHourOfDay",
  "CRSArrHourOfDay"
]
index_columns = [column + "_index" for column in string_columns]
index_columns


Out[55]:
['Carrier_index', 'Origin_index', 'Dest_index', 'Route_index', 'TailNum_index']

In [56]:
vector_assembler = VectorAssembler(
  inputCols=numeric_columns + index_columns,
  outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)

# Save the numeric vector assembler
vector_assembler_path = "../models/numeric_vector_assembler_5.0.bin"
vector_assembler.write().overwrite().save(vector_assembler_path)

# Drop the index columns
for column in index_columns:
  final_vectorized_features = final_vectorized_features.drop(column)

# Inspect the finalized features
final_vectorized_features.show(5)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+---------------+---------------+--------------+--------------------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|TailNum|EngineManufacturer|EngineModel|     Manufacturer|ManufacturerYear|OwnerState|CRSDepHourOfDay|CRSArrHourOfDay|ArrDelayBucket|        Features_vec|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+---------------+---------------+--------------+--------------------+
|   -10.0|2015-01-01 08:52:00|2015-01-01 07:00:00|     AA|         1|        4|        1|    -4.0| MIA|   594.0|2015-01-01|       17|   ATL|ATL-MIA| N019AA|             Empty|      Empty|            Empty|           Empty|     Empty|              7|              8|           1.0|[-4.0,594.0,1.0,1...|
|    -3.0|2015-01-01 23:02:00|2015-01-01 21:10:00|     AA|         1|        4|        1|    -7.0| MIA|   594.0|2015-01-01|      349|   ATL|ATL-MIA| N3KVAA|             Empty|      Empty|            Empty|           Empty|     Empty|             21|             23|           1.0|[-7.0,594.0,1.0,1...|
|    16.0|2015-01-01 15:30:00|2015-01-01 14:25:00|     AA|         1|        4|        1|    24.0| DFW|   190.0|2015-01-01|     2297|   AUS|AUS-DFW| N483AA|             Empty|      Empty|            Empty|           Empty|     Empty|             14|             15|           2.0|[24.0,190.0,1.0,1...|
|    25.0|2015-01-01 21:10:00|2015-01-01 19:45:00|     AA|         1|        4|        1|    28.0| LAX|  1242.0|2015-01-01|     1167|   AUS|AUS-LAX| N4XVAA|             Empty|      Empty|            Empty|           Empty|     Empty|             19|             21|           2.0|[28.0,1242.0,1.0,...|
|    -6.0|2015-01-01 16:55:00|2015-01-01 14:20:00|     AA|         1|        4|        1|    13.0| ORD|   978.0|2015-01-01|     1201|   AUS|AUS-ORD| N566AA|             P & W|JT8D SERIES|MCDONNELL DOUGLAS|            1987|     TEXAS|             14|             16|           1.0|[13.0,978.0,1.0,1...|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+------------------+-----------+-----------------+----------------+----------+---------------+---------------+--------------+--------------------+
only showing top 5 rows


In [57]:
#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#

from collections import defaultdict
scores = defaultdict(list)
feature_importances = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3

for i in range(1, split_count + 1):
  print(f"\nRun {i} out of {split_count} of test/train splits in cross validation...")
  
  # Test/train split
  training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])
  
  # Instantiate and fit random forest classifier on all the data
  from pyspark.ml.classification import RandomForestClassifier
  rfc = RandomForestClassifier(
    featuresCol="Features_vec",
    labelCol="ArrDelayBucket",
    predictionCol="Prediction",
    maxBins=4896,
    maxMemoryInMB=1024
  )
  model = rfc.fit(training_data)
  
  # Save the new model over the old one
  model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
  model.write().overwrite().save(model_output_path)
  
  # Evaluate model using test data
  predictions = model.transform(test_data)
  
  # Evaluate this split's results for each metric
  from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  for metric_name in metric_names:
    evaluator = MulticlassClassificationEvaluator(
      labelCol="ArrDelayBucket",
      predictionCol="Prediction",
      metricName=metric_name
    )
    score = evaluator.evaluate(predictions)
    
    scores[metric_name].append(score)
    print("{} = {}".format(metric_name, score))
  
  #
  # Collect feature importances
  #
  feature_names = vector_assembler.getInputCols()
  feature_importance_list = model.featureImportances
  for feature_name, feature_importance in zip(feature_names, feature_importance_list):
    feature_importances[feature_name].append(feature_importance)


Run 1 out of 3 of test/train splits in cross validation...
accuracy = 0.5889755011135858
weightedPrecision = 0.7012102804705427
weightedRecall = 0.5889755011135858
f1 = 0.5232180887199622

Run 2 out of 3 of test/train splits in cross validation...
accuracy = 0.579766107678729
weightedPrecision = 0.633427591129964
weightedRecall = 0.579766107678729
f1 = 0.5149846630682994

Run 3 out of 3 of test/train splits in cross validation...
accuracy = 0.5836094543848023
weightedPrecision = 0.51552540471056
weightedRecall = 0.5836094543848023
f1 = 0.5160721718042296

In [58]:
#
# Evaluate average and STD of each metric and print a table
#
import numpy as np
score_averages = defaultdict(float)

# Compute the table data
average_stds = []  # ha
for metric_name in metric_names:
  metric_scores = scores[metric_name]
  
  average_accuracy = sum(metric_scores) / len(metric_scores)
  score_averages[metric_name] = average_accuracy
  
  std_accuracy = np.std(metric_scores)
  
  average_stds.append((metric_name, average_accuracy, std_accuracy))

# Print the table
print("\nExperiment Log")
print("--------------")
print(tabulate(average_stds, headers=["Metric", "Average", "STD"]))


Experiment Log
--------------
Metric               Average         STD
-----------------  ---------  ----------
accuracy            0.584117  0.00377681
weightedPrecision   0.616721  0.0767205
weightedRecall      0.584117  0.00377681
f1                  0.518092  0.00365203

In [59]:
#
# Persist the score to a sccore log that exists between runs
#
import pickle

# Load the score log or initialize an empty one
try:
  score_log_filename = "../models/score_log.pickle"
  score_log = pickle.load(open(score_log_filename, "rb"))
  if not isinstance(score_log, list):
    score_log = []
except IOError:
  score_log = []

# Compute the existing score log entry
score_log_entry = {
  metric_name: score_averages[metric_name] for metric_name in metric_names
}

# Compute and display the change in score for each metric
try:
  last_log = score_log[-1]
except (IndexError, TypeError, AttributeError):
  last_log = score_log_entry

experiment_report = []
for metric_name in metric_names:
  run_delta = score_log_entry[metric_name] - last_log[metric_name]
  experiment_report.append((metric_name, run_delta))

print("\nExperiment Report")
print("-----------------")
print(tabulate(experiment_report, headers=["Metric", "Score"]))

# Append the existing average scores to the log
score_log.append(score_log_entry)

# Persist the log for next run
pickle.dump(score_log, open(score_log_filename, "wb"))


Experiment Report
-----------------
Metric                  Score
-----------------  ----------
accuracy           0.00187365
weightedPrecision  0.101154
weightedRecall     0.00187365
f1                 0.00223582

In [60]:
#
# Analyze and report feature importance changes
#

# Compute averages for each feature
feature_importance_entry = defaultdict(float)
for feature_name, value_list in feature_importances.items():
  average_importance = sum(value_list) / len(value_list)
  feature_importance_entry[feature_name] = average_importance

# Sort the feature importances in descending order and print
import operator
sorted_feature_importances = sorted(
  feature_importance_entry.items(),
  key=operator.itemgetter(1),
  reverse=True
)

print("\nFeature Importances")
print("-------------------")
print(tabulate(sorted_feature_importances, headers=['Name', 'Importance']))


Feature Importances
-------------------
Name               Importance
---------------  ------------
DepDelay           0.848534
DayOfYear          0.0332143
TailNum_index      0.0279824
DayOfMonth         0.0208377
Origin_index       0.0207811
Route_index        0.0155598
Distance           0.0132175
Dest_index         0.00951608
Carrier_index      0.00496423
CRSDepHourOfDay    0.00380001
CRSArrHourOfDay    0.00159252

In [61]:
#
# Compare this run's feature importances with the previous run's
#

# Load the feature importance log or initialize an empty one
try:
  feature_log_filename = "../models/feature_log.pickle"
  feature_log = pickle.load(open(feature_log_filename, "rb"))
  if not isinstance(feature_log, list):
    feature_log = []
except IOError:
  feature_log = []

# Compute and display the change in score for each feature
try:
  last_feature_log = feature_log[-1]
except (IndexError, TypeError, AttributeError):
  last_feature_log = defaultdict(float)
  for feature_name, importance in feature_importance_entry.items():
    last_feature_log[feature_name] = importance

# Compute the deltas
feature_deltas = {}
for feature_name in feature_importances.keys():
  run_delta = feature_importance_entry[feature_name] - last_feature_log[feature_name]
  feature_deltas[feature_name] = run_delta

# Sort feature deltas, biggest change first
import operator
sorted_feature_deltas = sorted(
  feature_deltas.items(),
  key=operator.itemgetter(1),
  reverse=True
)

# Display sorted feature deltas
print("\nFeature Importance Delta Report")
print("-------------------------------")
print(tabulate(sorted_feature_deltas, headers=["Feature", "Delta"]))

# Append the existing average deltas to the log
feature_log.append(feature_importance_entry)

# Persist the log for next run
pickle.dump(feature_log, open(feature_log_filename, "wb"))


Feature Importance Delta Report
-------------------------------
Feature                 Delta
---------------  ------------
DepDelay          0.0297091
TailNum_index     0.00474146
Dest_index        0.00111106
Distance          0.000726107
CRSDepHourOfDay  -0.00144376
Route_index      -0.00277233
CRSArrHourOfDay  -0.00336425
Carrier_index    -0.00344181
Origin_index     -0.0037315
DayOfYear        -0.0057618
DayOfMonth       -0.00794093

Interpreting Our Results

This impacts the score in a positive way (for me), but not in a significant way: an improvement of 0.00031884 in accuracy. However, at this point all our features are contributing significantly to the model’s prediction quality, which is where we want to be:

Feature Importances
-------------------
Name               Importance
---------------  ------------
DepDelay           0.775767
TailNum_index      0.0541045
Route_index        0.0401366
Origin_index       0.0290746
DayOfMonth         0.0287668
DayOfYear          0.0268546
Distance           0.0165887
Dest_index         0.0126576
CRSDepHourOfDay    0.00680116
Carrier_index      0.00542581
CRSArrHourOfDay    0.00382289

Remember: when it comes to predictive models, simpler is better. If a feature doesn’t sizably influence prediction accuracy, remove it. The model’s quality will increase, it will perform faster in production, and you will have an easier time understanding the impact of additional features on the model. A simpler model will be less susceptible to bias.

Incorporating Flight Time

One thing we haven’t considered yet is the flight time. We should be able to subtract the takeoff time from the landing time and get the duration of the flight. Since distance is a top-3 feature, and the hour of day matters, it seems like flight time might eke out a bit more prediction quality. Let’s try!

In the book, we computed this field by converting our ISO datetimes into unix times (seconds since 1970) and subtracted the takeoff time from the landing time. This gave us flight time in seconds. However, it turns out there is a field called AirTime which is the minutes in the air. Lets try adding AirTime and see how things work.

Availability of Feature at Runtime

It is easy to make the mistake of incorporating a feature into your model that you can't reliabily retrieve in realtime to make a prediction. Remember - if you can't get it in realtime when you make a prediction, you can't incorporate it into the model... not if your model is going to work in the real world. This is a big difference between data science in Kaffle competitions and data science in practice.

AirTime would be available at runtime, if we compute an expected AirTime by subtracting the scheduled arrival time CRSArrTime from the schedule departure CRSDepTime, after converting both to unix time. We would then have to divide by 60 to get the expected AirTime. Would that work alright? One must reason about features... and in this case it seems like a valid feature, and a similar feature, Distance has a 1.65% relative feature importance (for me).

Check out ch09/extract_features_with_flight_time.py, which we copied from ch09/extract_features_with_airplanes.py. We only need to change one line, our selectExpr, to add the date math for our FlightTime field:

Lets add AirTime to our model and see whether it helps or not. We'll have to go back and load the AirTime column, then proceed with our experiment. We're going to put all the code in one block this time, to give you a code block you can use to add your own features to the model later.

We simply add the AirTime column to our initial SELECT statement to Spark SQL. Note that sometimes AirTime is not present, resulting in null values. To address this, we will use Spark SQL's COALESCE function to impute (fill in) a 0.0 value in place of null. This way our model can use this missing data as evidence of timeliness, along with the values which are present.


In [62]:
# Load the on-time parquet file
input_path = "../data/january_performance.parquet"
on_time_dataframe = spark.read.parquet(input_path)
on_time_dataframe.registerTempTable("on_time_performance")

# Filtering on FlightDate > 2/1 and a 10% sample are for a training course. 
# Feel free to use all the data if you have the RAM and the time!
features = spark.sql("""
SELECT
    FlightNum,
    FlightDate,
    DayOfWeek,
    DayofMonth AS DayOfMonth,
    CONCAT(Month, '-',  DayofMonth) AS DayOfYear,
    Carrier,
    Origin,
    Dest,
    Distance,
    DepDelay,
    ArrDelay,
    CRSDepTime,
    CRSArrTime,
    CONCAT(Origin, '-', Dest) AS Route,
    TailNum,
    COALESCE(AirTime, 0.0) AS AirTime
FROM on_time_performance
WHERE FlightDate < '2015-02-01'
""")
features = features.sample(False, 0.1) 

# Filter nulls, they can't help us
features = features.filter(
    (features.ArrDelay.isNotNull())
    &
    (features.DepDelay.isNotNull())
)
features.show(10)

#
# Add the hour of day of scheduled arrival/departure
#
from pyspark.sql.functions import hour
features_with_hour = features.withColumn(
  "CRSDepHourOfDay",
  hour(features.CRSDepTime)
)
features_with_hour = features_with_hour.withColumn(
  "CRSArrHourOfDay",
  hour(features.CRSArrTime)
)
features_with_hour.select("CRSDepTime", "CRSDepHourOfDay", "CRSArrTime", "CRSArrHourOfDay").show(5)

# We need to turn timestamps into timestamps, and not strings or numbers
def convert_hours(hours_minutes):
  hours = hours_minutes[:-2]
  minutes = hours_minutes[-2:]
  
  if hours == '24':
    hours = '23'
    minutes = '59'
  
  time_string = "{}:{}:00Z".format(hours, minutes)
  return time_string

def compose_datetime(iso_date, time_string):
  return "{} {}".format(iso_date, time_string)

def create_iso_string(iso_date, hours_minutes):
  time_string = convert_hours(hours_minutes)
  full_datetime = compose_datetime(iso_date, time_string)
  return full_datetime

def create_datetime(iso_string):
  return iso8601.parse_date(iso_string)

def convert_datetime(iso_date, hours_minutes):
  iso_string = create_iso_string(iso_date, hours_minutes)
  dt = create_datetime(iso_string)
  return dt

def day_of_year(iso_date_string):
  dt = iso8601.parse_date(iso_date_string)
  doy = dt.timetuple().tm_yday
  return doy

def alter_feature_datetimes(row):
  
  flight_date = iso8601.parse_date(row['FlightDate'])
  scheduled_dep_time = convert_datetime(row['FlightDate'], row['CRSDepTime'])
  scheduled_arr_time = convert_datetime(row['FlightDate'], row['CRSArrTime'])
  
  # Handle overnight flights
  if scheduled_arr_time < scheduled_dep_time:
    scheduled_arr_time += datetime.timedelta(days=1)
  
  doy = day_of_year(row['FlightDate'])
  
  return {
    'FlightNum': row['FlightNum'],
    'FlightDate': flight_date,
    'DayOfWeek': int(row['DayOfWeek']),
    'DayOfMonth': int(row['DayOfMonth']),
    'DayOfYear': doy,
    'Carrier': row['Carrier'],
    'Origin': row['Origin'],
    'Dest': row['Dest'],
    'Distance': row['Distance'],
    'DepDelay': row['DepDelay'],
    'ArrDelay': row['ArrDelay'],
    'CRSDepTime': scheduled_dep_time,
    'CRSArrTime': scheduled_arr_time,
    'Route': row['Route'],
    'TailNum': row['TailNum'],
    'AirTime': row['AirTime']
  }

timestamp_features = features_with_hour.rdd.map(alter_feature_datetimes)
timestamp_df = timestamp_features.toDF()

# Explicitly sort the data and keep it sorted throughout. Leave nothing to chance.
sorted_features = timestamp_df.sort(
  timestamp_df.DayOfYear,
  timestamp_df.Carrier,
  timestamp_df.Origin,
  timestamp_df.Dest,
  timestamp_df.FlightNum,
  timestamp_df.CRSDepTime,
  timestamp_df.CRSArrTime,
)

sorted_features.show(10)

# Store as a single json file
output_path = "../data/simple_flight_delay_features_flight_times.json"
sorted_features.repartition(1).write.mode("overwrite").json(output_path)

print("Features with AirTime prepared!")


+---------+----------+---------+----------+---------+-------+------+----+--------+--------+--------+----------+----------+-------+-------+-------+
|FlightNum|FlightDate|DayOfWeek|DayOfMonth|DayOfYear|Carrier|Origin|Dest|Distance|DepDelay|ArrDelay|CRSDepTime|CRSArrTime|  Route|TailNum|AirTime|
+---------+----------+---------+----------+---------+-------+------+----+--------+--------+--------+----------+----------+-------+-------+-------+
|     2349|2015-01-01|        4|         1|      1-1|     AA|   ORD| DFW|   802.0|     0.0|    26.0|      1845|      2115|ORD-DFW| N002AA|  129.0|
|     1422|2015-01-01|        4|         1|      1-1|     AA|   DFW| HDN|   769.0|    78.0|    78.0|      0800|      0925|DFW-HDN| N003AA|  111.0|
|      356|2015-01-01|        4|         1|      1-1|     AA|   ATL| DFW|   731.0|    -5.0|     1.0|      1640|      1805|ATL-DFW| N006AA|  122.0|
|     2360|2015-01-01|        4|         1|      1-1|     AA|   GUC| DFW|   678.0|   199.0|   222.0|      1450|      1750|GUC-DFW| N012AA|   95.0|
|      903|2015-01-01|        4|         1|      1-1|     AA|   LAX| EGE|   748.0|    -7.0|    16.0|      0830|      1145|LAX-EGE| N017AA|  112.0|
|     1162|2015-01-01|        4|         1|      1-1|     AA|   DFW| JAC|  1047.0|    45.0|    74.0|      1740|      1935|DFW-JAC| N018AA|  148.0|
|     1137|2015-01-01|        4|         1|      1-1|     AA|   IAD| MIA|   921.0|    -2.0|     7.0|      1721|      2010|IAD-MIA| N022AA|  133.0|
|     1697|2015-01-01|        4|         1|      1-1|     AA|   MIA| STL|  1068.0|   101.0|    97.0|      1400|      1555|MIA-STL| N023AA|  150.0|
|     2330|2015-01-01|        4|         1|      1-1|     AA|   MIA| IAD|   921.0|    82.0|    72.0|      2125|      2354|MIA-IAD| N023AA|  118.0|
|     2321|2015-01-01|        4|         1|      1-1|     AA|   ORD| DFW|   802.0|     3.0|    19.0|      1220|      1455|ORD-DFW| N200AA|  135.0|
+---------+----------+---------+----------+---------+-------+------+----+--------+--------+--------+----------+----------+-------+-------+-------+
only showing top 10 rows

+----------+---------------+----------+---------------+
|CRSDepTime|CRSDepHourOfDay|CRSArrTime|CRSArrHourOfDay|
+----------+---------------+----------+---------------+
|      1845|              0|      2115|              0|
|      0800|              0|      0925|              0|
|      1640|              0|      1805|              0|
|      1450|              0|      1750|              0|
|      0830|              0|      1145|              0|
+----------+---------------+----------+---------------+
only showing top 5 rows

+-------+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+-------------------+---------+------+-------+-------+
|AirTime|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|         FlightDate|FlightNum|Origin|  Route|TailNum|
+-------+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+-------------------+---------+------+-------+-------+
|  122.0|     1.0|2015-01-01 18:05:00|2015-01-01 16:40:00|     AA|         1|        4|        1|    -5.0| DFW|   731.0|2015-01-01 00:00:00|      356|   ATL|ATL-DFW| N006AA|
|   86.0|    -3.0|2015-01-01 23:02:00|2015-01-01 21:10:00|     AA|         1|        4|        1|    -7.0| MIA|   594.0|2015-01-01 00:00:00|      349|   ATL|ATL-MIA| N3KVAA|
|   36.0|   -16.0|2015-01-01 13:05:00|2015-01-01 12:00:00|     AA|         1|        4|        1|    -7.0| DFW|   190.0|2015-01-01 00:00:00|     2230|   AUS|AUS-DFW| N574AA|
|  164.0|    20.0|2015-01-01 10:06:00|2015-01-01 06:40:00|     AA|         1|        4|        1|    -6.0| MIA|  1194.0|2015-01-01 00:00:00|     2476|   BDL|BDL-MIA| N3LEAA|
|  112.0|    26.0|2015-01-01 19:20:00|2015-01-01 17:10:00|     AA|         1|        4|        1|    -4.0| DFW|   597.0|2015-01-01 00:00:00|     1224|   BHM|BHM-DFW| N4WMAA|
|  255.0|    -8.0|2015-01-01 08:55:00|2015-01-01 06:10:00|     AA|         1|        4|        1|    -3.0| LAX|  1797.0|2015-01-01 00:00:00|     1361|   BNA|BNA-LAX| N3KYAA|
|  264.0|    10.0|2015-01-01 19:40:00|2015-01-01 16:05:00|     AA|         1|        4|        1|    -6.0| DFW|  1562.0|2015-01-01 00:00:00|     1448|   BOS|BOS-DFW| N3EGAA|
|  381.0|     7.0|2015-01-01 12:35:00|2015-01-01 09:00:00|     AA|         1|        4|        1|    -3.0| LAX|  2611.0|2015-01-01 00:00:00|       25|   BOS|BOS-LAX| N3AVAA|
|  131.0|    -9.0|2015-01-01 20:12:00|2015-01-01 17:26:00|     AA|         1|        4|        1|   -11.0| MIA|   946.0|2015-01-01 00:00:00|      357|   BWI|BWI-MIA| N3HAAA|
|  152.0|   -14.0|2015-01-01 09:10:00|2015-01-01 07:05:00|     AA|         1|        4|        1|    -7.0| DFW|   926.0|2015-01-01 00:00:00|     2434|   CMH|CMH-DFW| N575AA|
+-------+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+-------------------+---------+------+-------+-------+
only showing top 10 rows

Features with AirTime prepared!

Training the Model

Note that we still store our data to disk and then load it explicitly. This ensures its provenance and formatting are exactly as we expect, and have not been inferred... inference that could change later and throw off our model. Model imput must be as accurate and precise as possible!

Here we just add AirTime to the columns we load, then we add it to the list of numeric_columns. The rest is the same as before.


In [63]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf

schema = StructType([
  StructField("ArrDelay", DoubleType(), True),
  StructField("CRSArrTime", TimestampType(), True),
  StructField("CRSDepTime", TimestampType(), True),
  StructField("Carrier", StringType(), True),
  StructField("DayOfMonth", IntegerType(), True),
  StructField("DayOfWeek", IntegerType(), True),
  StructField("DayOfYear", IntegerType(), True),
  StructField("DepDelay", DoubleType(), True),
  StructField("Dest", StringType(), True),
  StructField("Distance", DoubleType(), True),
  StructField("FlightDate", DateType(), True),
  StructField("FlightNum", StringType(), True),
  StructField("Origin", StringType(), True),
  StructField("Route", StringType(), True),
  StructField("TailNum", StringType(), True),
  StructField("AirTime", FloatType(), True)
])



input_path = "../data/simple_flight_delay_features_flight_times.json"
features = spark.read.json(input_path, schema=schema)
features.show(5)

#
# Add the hour of day of scheduled arrival/departure
#
from pyspark.sql.functions import hour
features_with_hour = features.withColumn(
  "CRSDepHourOfDay",
  hour(features.CRSDepTime)
)
features_with_hour = features_with_hour.withColumn(
  "CRSArrHourOfDay",
  hour(features.CRSArrTime)
)
features_with_hour.select("CRSDepTime", "CRSDepHourOfDay", "CRSArrTime", "CRSArrHourOfDay").show(5)

#
# Check for nulls in features before using Spark ML
#
null_counts = [(column, features_with_hour.where(features_with_hour[column].isNull()).count()) for column in features_with_hour.columns]
cols_with_nulls = filter(lambda x: x[1] > 0, null_counts)
print("\nNull Value Report")
print("-----------------")
print(tabulate(cols_with_nulls, headers=["Column", "Nulls"]))

#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay into on-time, slightly late, very late (0, 1, 2)
#
from pyspark.ml.feature import Bucketizer

# Setup the Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
arrival_bucketizer = Bucketizer(
  splits=splits,
  inputCol="ArrDelay",
  outputCol="ArrDelayBucket"
)

# Save the model
arrival_bucketizer_path = "../models/arrival_bucketizer_2.0.bin"
arrival_bucketizer.write().overwrite().save(arrival_bucketizer_path)

# Apply the model
ml_bucketized_features = arrival_bucketizer.transform(features_with_hour)
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show(5)

ml_bucketized_features.show(5)

#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Turn category fields into indexes
string_columns = ["Carrier", "Origin", "Dest", "Route", "TailNum"]
for column in string_columns:
  string_indexer = StringIndexer(
    inputCol=column,
    outputCol=column + "_index"
  )
  
  string_indexer_model = string_indexer.fit(ml_bucketized_features)
  ml_bucketized_features = string_indexer_model.transform(ml_bucketized_features)
  
  # Save the pipeline model
  string_indexer_output_path = "../models/string_indexer_model_4.0.{}.bin".format(
    column
  )
  string_indexer_model.write().overwrite().save(string_indexer_output_path)

ml_bucketized_features.show(5)

# Combine continuous, numeric fields with indexes of nominal ones
# ...into one feature vector
numeric_columns = [
  "DepDelay",
  "Distance",
  "DayOfYear",
  "DayOfMonth",
  "CRSDepHourOfDay",
  "CRSArrHourOfDay",
  "AirTime"
]
index_columns = [column + "_index" for column in string_columns]
input_columns = numeric_columns + index_columns

vector_assembler = VectorAssembler(
  inputCols=input_columns,
  outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)

# Save the numeric vector assembler
vector_assembler_path = "../models/numeric_vector_assembler_5.0.bin"
vector_assembler.write().overwrite().save(vector_assembler_path)

# Drop the index columns
for column in index_columns:
  final_vectorized_features = final_vectorized_features.drop(column)

# Inspect the finalized features
final_vectorized_features.show(5)

#
# Cross validate, train and evaluate classifier: loop 5 times for 4 metrics
#

from collections import defaultdict
scores = defaultdict(list)
feature_importances = defaultdict(list)
metric_names = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
split_count = 3

for i in range(1, split_count + 1):
  print("\nRun {} out of {} of test/train splits in cross validation...".format(
    i,
    split_count,
  )
  )
  
  # Test/train split
  training_data, test_data = final_vectorized_features.randomSplit([0.8, 0.2])
  
  # Instantiate and fit random forest classifier on all the data
  from pyspark.ml.classification import RandomForestClassifier
  rfc = RandomForestClassifier(
    featuresCol="Features_vec",
    labelCol="ArrDelayBucket",
    predictionCol="Prediction",
    maxBins=4896,
    maxMemoryInMB=1024
  )
  model = rfc.fit(training_data)
  
  # Save the new model over the old one
  model_output_path = "../models/spark_random_forest_classifier.flight_delays.baseline.bin"
  model.write().overwrite().save(model_output_path)
  
  # Evaluate model using test data
  predictions = model.transform(test_data)
  
  # Evaluate this split's results for each metric
  from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  for metric_name in metric_names:
    evaluator = MulticlassClassificationEvaluator(
      labelCol="ArrDelayBucket",
      predictionCol="Prediction",
      metricName=metric_name
    )
    score = evaluator.evaluate(predictions)
    
    scores[metric_name].append(score)
    print("{} = {}".format(metric_name, score))
  
  #
  # Collect feature importances
  #
  feature_names = vector_assembler.getInputCols()
  feature_importance_list = model.featureImportances
  for feature_name, feature_importance in zip(feature_names, feature_importance_list):
    feature_importances[feature_name].append(feature_importance)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|TailNum|AirTime|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+
|     1.0|2015-01-01 18:05:00|2015-01-01 16:40:00|     AA|         1|        4|        1|    -5.0| DFW|   731.0|2015-01-01|      356|   ATL|ATL-DFW| N006AA|  122.0|
|    -3.0|2015-01-01 23:02:00|2015-01-01 21:10:00|     AA|         1|        4|        1|    -7.0| MIA|   594.0|2015-01-01|      349|   ATL|ATL-MIA| N3KVAA|   86.0|
|   -16.0|2015-01-01 13:05:00|2015-01-01 12:00:00|     AA|         1|        4|        1|    -7.0| DFW|   190.0|2015-01-01|     2230|   AUS|AUS-DFW| N574AA|   36.0|
|    20.0|2015-01-01 10:06:00|2015-01-01 06:40:00|     AA|         1|        4|        1|    -6.0| MIA|  1194.0|2015-01-01|     2476|   BDL|BDL-MIA| N3LEAA|  164.0|
|    26.0|2015-01-01 19:20:00|2015-01-01 17:10:00|     AA|         1|        4|        1|    -4.0| DFW|   597.0|2015-01-01|     1224|   BHM|BHM-DFW| N4WMAA|  112.0|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+
only showing top 5 rows

+-------------------+---------------+-------------------+---------------+
|         CRSDepTime|CRSDepHourOfDay|         CRSArrTime|CRSArrHourOfDay|
+-------------------+---------------+-------------------+---------------+
|2015-01-01 16:40:00|             16|2015-01-01 18:05:00|             18|
|2015-01-01 21:10:00|             21|2015-01-01 23:02:00|             23|
|2015-01-01 12:00:00|             12|2015-01-01 13:05:00|             13|
|2015-01-01 06:40:00|              6|2015-01-01 10:06:00|             10|
|2015-01-01 17:10:00|             17|2015-01-01 19:20:00|             19|
+-------------------+---------------+-------------------+---------------+
only showing top 5 rows


Null Value Report
-----------------
Column    Nulls
--------  -------
+--------+--------------+
|ArrDelay|ArrDelayBucket|
+--------+--------------+
|     1.0|           2.0|
|    -3.0|           1.0|
|   -16.0|           0.0|
|    20.0|           2.0|
|    26.0|           2.0|
+--------+--------------+
only showing top 5 rows

+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+---------------+---------------+--------------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|TailNum|AirTime|CRSDepHourOfDay|CRSArrHourOfDay|ArrDelayBucket|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+---------------+---------------+--------------+
|     1.0|2015-01-01 18:05:00|2015-01-01 16:40:00|     AA|         1|        4|        1|    -5.0| DFW|   731.0|2015-01-01|      356|   ATL|ATL-DFW| N006AA|  122.0|             16|             18|           2.0|
|    -3.0|2015-01-01 23:02:00|2015-01-01 21:10:00|     AA|         1|        4|        1|    -7.0| MIA|   594.0|2015-01-01|      349|   ATL|ATL-MIA| N3KVAA|   86.0|             21|             23|           1.0|
|   -16.0|2015-01-01 13:05:00|2015-01-01 12:00:00|     AA|         1|        4|        1|    -7.0| DFW|   190.0|2015-01-01|     2230|   AUS|AUS-DFW| N574AA|   36.0|             12|             13|           0.0|
|    20.0|2015-01-01 10:06:00|2015-01-01 06:40:00|     AA|         1|        4|        1|    -6.0| MIA|  1194.0|2015-01-01|     2476|   BDL|BDL-MIA| N3LEAA|  164.0|              6|             10|           2.0|
|    26.0|2015-01-01 19:20:00|2015-01-01 17:10:00|     AA|         1|        4|        1|    -4.0| DFW|   597.0|2015-01-01|     1224|   BHM|BHM-DFW| N4WMAA|  112.0|             17|             19|           2.0|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+---------------+---------------+--------------+
only showing top 5 rows

+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+---------------+---------------+--------------+-------------+------------+----------+-----------+-------------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|TailNum|AirTime|CRSDepHourOfDay|CRSArrHourOfDay|ArrDelayBucket|Carrier_index|Origin_index|Dest_index|Route_index|TailNum_index|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+---------------+---------------+--------------+-------------+------------+----------+-----------+-------------+
|     1.0|2015-01-01 18:05:00|2015-01-01 16:40:00|     AA|         1|        4|        1|    -5.0| DFW|   731.0|2015-01-01|      356|   ATL|ATL-DFW| N006AA|  122.0|             16|             18|           2.0|          4.0|         0.0|       1.0|       14.0|       2720.0|
|    -3.0|2015-01-01 23:02:00|2015-01-01 21:10:00|     AA|         1|        4|        1|    -7.0| MIA|   594.0|2015-01-01|      349|   ATL|ATL-MIA| N3KVAA|   86.0|             21|             23|           1.0|          4.0|         0.0|      21.0|      119.0|       1075.0|
|   -16.0|2015-01-01 13:05:00|2015-01-01 12:00:00|     AA|         1|        4|        1|    -7.0| DFW|   190.0|2015-01-01|     2230|   AUS|AUS-DFW| N574AA|   36.0|             12|             13|           0.0|          4.0|        33.0|       1.0|      100.0|       2012.0|
|    20.0|2015-01-01 10:06:00|2015-01-01 06:40:00|     AA|         1|        4|        1|    -6.0| MIA|  1194.0|2015-01-01|     2476|   BDL|BDL-MIA| N3LEAA|  164.0|              6|             10|           2.0|          4.0|        55.0|      21.0|     3462.0|       2060.0|
|    26.0|2015-01-01 19:20:00|2015-01-01 17:10:00|     AA|         1|        4|        1|    -4.0| DFW|   597.0|2015-01-01|     1224|   BHM|BHM-DFW| N4WMAA|  112.0|             17|             19|           2.0|          4.0|        67.0|       1.0|     2846.0|       3703.0|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+---------------+---------------+--------------+-------------+------------+----------+-----------+-------------+
only showing top 5 rows

+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+---------------+---------------+--------------+--------------------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|TailNum|AirTime|CRSDepHourOfDay|CRSArrHourOfDay|ArrDelayBucket|        Features_vec|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+---------------+---------------+--------------+--------------------+
|     1.0|2015-01-01 18:05:00|2015-01-01 16:40:00|     AA|         1|        4|        1|    -5.0| DFW|   731.0|2015-01-01|      356|   ATL|ATL-DFW| N006AA|  122.0|             16|             18|           2.0|[-5.0,731.0,1.0,1...|
|    -3.0|2015-01-01 23:02:00|2015-01-01 21:10:00|     AA|         1|        4|        1|    -7.0| MIA|   594.0|2015-01-01|      349|   ATL|ATL-MIA| N3KVAA|   86.0|             21|             23|           1.0|[-7.0,594.0,1.0,1...|
|   -16.0|2015-01-01 13:05:00|2015-01-01 12:00:00|     AA|         1|        4|        1|    -7.0| DFW|   190.0|2015-01-01|     2230|   AUS|AUS-DFW| N574AA|   36.0|             12|             13|           0.0|[-7.0,190.0,1.0,1...|
|    20.0|2015-01-01 10:06:00|2015-01-01 06:40:00|     AA|         1|        4|        1|    -6.0| MIA|  1194.0|2015-01-01|     2476|   BDL|BDL-MIA| N3LEAA|  164.0|              6|             10|           2.0|[-6.0,1194.0,1.0,...|
|    26.0|2015-01-01 19:20:00|2015-01-01 17:10:00|     AA|         1|        4|        1|    -4.0| DFW|   597.0|2015-01-01|     1224|   BHM|BHM-DFW| N4WMAA|  112.0|             17|             19|           2.0|[-4.0,597.0,1.0,1...|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+-------+-------+---------------+---------------+--------------+--------------------+
only showing top 5 rows


Run 1 out of 3 of test/train splits in cross validation...
accuracy = 0.5834063047285464
weightedPrecision = 0.5168915983398141
weightedRecall = 0.5834063047285464
f1 = 0.5203090213059611

Run 2 out of 3 of test/train splits in cross validation...
accuracy = 0.5826564215148189
weightedPrecision = 0.5163211636392262
weightedRecall = 0.5826564215148189
f1 = 0.5196737570088943

Run 3 out of 3 of test/train splits in cross validation...
accuracy = 0.5780996649010918
weightedPrecision = 0.5100745548772154
weightedRecall = 0.5780996649010918
f1 = 0.5145229403998973

Calculating AirTime Performance

Then we calculate performance again... this time we do everything in one block, again so you can easily copy/paste this below to add your own new features!


In [64]:
#
# Evaluate average and STD of each metric and print a table
#
import numpy as np
score_averages = defaultdict(float)

# Compute the table data
average_stds = []  # ha
for metric_name in metric_names:
  metric_scores = scores[metric_name]
  
  average_accuracy = sum(metric_scores) / len(metric_scores)
  score_averages[metric_name] = average_accuracy
  
  std_accuracy = np.std(metric_scores)
  
  average_stds.append((metric_name, average_accuracy, std_accuracy))

# Print the table
print("\nExperiment Log")
print("--------------")
print(tabulate(average_stds, headers=["Metric", "Average", "STD"]))

#
# Persist the score to a sccore log that exists between runs
#
import pickle

# Load the score log or initialize an empty one
try:
  score_log_filename = "../models/score_log.pickle"
  score_log = pickle.load(open(score_log_filename, "rb"))
  if not isinstance(score_log, list):
    score_log = []
except IOError:
  score_log = []

# Compute the existing score log entry
score_log_entry = {
  metric_name: score_averages[metric_name] for metric_name in metric_names
}

# Compute and display the change in score for each metric
try:
  last_log = score_log[-1]
except (IndexError, TypeError, AttributeError):
  last_log = score_log_entry

experiment_report = []
for metric_name in metric_names:
  run_delta = score_log_entry[metric_name] - last_log[metric_name]
  experiment_report.append((metric_name, run_delta))

print("\nExperiment Report")
print("-----------------")
print(tabulate(experiment_report, headers=["Metric", "Score"]))

# Append the existing average scores to the log
score_log.append(score_log_entry)

# Persist the log for next run
pickle.dump(score_log, open(score_log_filename, "wb"))

#
# Analyze and report feature importance changes
#

# Compute averages for each feature
feature_importance_entry = defaultdict(float)
for feature_name, value_list in feature_importances.items():
  average_importance = sum(value_list) / len(value_list)
  feature_importance_entry[feature_name] = average_importance

# Sort the feature importances in descending order and print
import operator
sorted_feature_importances = sorted(
  feature_importance_entry.items(),
  key=operator.itemgetter(1),
  reverse=True
)

print("\nFeature Importances")
print("-------------------")
print(tabulate(sorted_feature_importances, headers=['Name', 'Importance']))

#
# Compare this run's feature importances with the previous run's
#

# Load the feature importance log or initialize an empty one
try:
  feature_log_filename = "../models/feature_log.pickle"
  feature_log = pickle.load(open(feature_log_filename, "rb"))
  if not isinstance(feature_log, list):
    feature_log = []
except IOError:
  feature_log = []

# Compute and display the change in score for each feature
try:
  last_feature_log = feature_log[-1]
except (IndexError, TypeError, AttributeError):
  last_feature_log = defaultdict(float)
  for feature_name, importance in feature_importance_entry.items():
    last_feature_log[feature_name] = importance

# Compute the deltas
feature_deltas = {}
for feature_name in feature_importances.keys():
  run_delta = feature_importance_entry[feature_name] - last_feature_log[feature_name]
  feature_deltas[feature_name] = run_delta

# Sort feature deltas, biggest change first
import operator
sorted_feature_deltas = sorted(
  feature_deltas.items(),
  key=operator.itemgetter(1),
  reverse=True
)

# Display sorted feature deltas
print("\nFeature Importance Delta Report")
print("-------------------------------")
print(tabulate(sorted_feature_deltas, headers=["Feature", "Delta"]))

# Append the existing average deltas to the log
feature_log.append(feature_importance_entry)

# Persist the log for next run
pickle.dump(feature_log, open(feature_log_filename, "wb"))


Experiment Log
--------------
Metric               Average         STD
-----------------  ---------  ----------
accuracy            0.581387  0.00234489
weightedPrecision   0.514429  0.00308793
weightedRecall      0.581387  0.00234489
f1                  0.518169  0.00259086

Experiment Report
-----------------
Metric                    Score
-----------------  ------------
accuracy           -0.00272956
weightedPrecision  -0.102292
weightedRecall     -0.00272956
f1                  7.69317e-05

Feature Importances
-------------------
Name               Importance
---------------  ------------
DepDelay           0.843461
DayOfYear          0.0346129
TailNum_index      0.0273644
Route_index        0.0235947
DayOfMonth         0.0233104
Origin_index       0.0126936
Distance           0.00883134
Dest_index         0.00851761
AirTime            0.00534284
CRSArrHourOfDay    0.00514335
Carrier_index      0.00464775
CRSDepHourOfDay    0.00248028

Feature Importance Delta Report
-------------------------------
Feature                 Delta
---------------  ------------
Route_index       0.00803488
AirTime           0.00534284
CRSArrHourOfDay   0.00355083
DayOfMonth        0.00247274
DayOfYear         0.00139867
Carrier_index    -0.000316482
TailNum_index    -0.000617967
Dest_index       -0.00099847
CRSDepHourOfDay  -0.00131973
Distance         -0.00438617
DepDelay         -0.00507362
Origin_index     -0.00808751

Interpreting AirTime Performance

This output suggests a significant improvement in performance! weightedPrecision is up by 0.14, and the FlightTime contributes about half a percent to the feature importance. Also note that the feature importance of FlightTime comes at the expense of Distance and DepDelay, which seems expected: Distance is conceptually similar to FlightTime, and DepDelay is the most important feature. Taken together, the performance and feature importance metrics indicate that FlightTime is a worthwhile improvement to our model:

At this point, once again it seems that we’ve exhausted the possibilities of the date/time features (at least, without resorting to more sophisticated time series analysis techniques than I know).

Conclusion

In this chapter we covered how to improve on our model using the data we’ve already collected. We can use this approach in combination with our ability to deploy applications to continuously improve our predictive systems.

Exercises

  1. Look at the list of fields in the complete dataset at https://www.transtats.bts.gov/Fields.asp. Add another field to the model. It can be directly added or it can be derived from multiple values.

  2. Find and add an external feature to this model. Find another dataset which can be joined into this feature set to provide new features.


In [ ]: