Data Science with Hadoop - Predicting airline delays - part 2: Spark and ML-Lib

Introduction

In this 2nd part of the supplement to the second blog on data science, we continue to demonstrate how to build a predictive model with Hadoop, this time we'll use Apache Spark and ML-Lib.

In the context of our demo, we will show how to use Apache Spark via its Scala API to generate our feature matrix and also use ML-Lib (Spark's machine learning library) to build and evaluate our classification models.

Recall from part 1 that we are constructing a predictive model for flight delays. Our source dataset resides here, and includes details about flights in the US from the years 1987-2008. We have also enriched the data with weather information, where we find daily temperatures (min/max), wind speed, snow conditions and precipitation.

We will build a supervised learning model to predict flight delays for flights leaving O'Hare International airport (ORD). We will use the year 2007 data to build the model, and test its validity using data from 2008.

Pre-processing with Hadoop and Spark

Apache Spark's basic data abstraction is that of an RDD (resilient distributed dataset), which is a fault-tolerant collection of elements that can be operated on in parallel across your Hadoop cluster.

Spark's API (available in Scala, Python or Java) supports a variety of transformations such as map() and flatMap(), filter(), join(), and others to create and manipulate RDDs. For a full description of the API please check the Spark API programming guide.

Recall from part 1 that in our first iteration we generated the following features for each flight:

  • month: winter months should have more delays than summer months
  • day of month: this is likely not a very predictive variable, but let's keep it in anyway
  • day of week: weekend vs. weekday
  • hour of the day: later hours tend to have more delays
  • Carrier: we might expect some carriers to be more prone to delays than others
  • Destination airport: we expect some airports to be more prone to delays than others; and
  • Distance: interesting to see if this variable is a good predictor of delay

We will use Spark RDDs to perform the same pre-processing, transforming the raw flight delay dataset into the two feature matrices: data_2007 (our training set) and data_2008 (our test set).

The case class DelayRec that encapsulates a flight delay record represents the feature vector, and its methods do most of the heavy lifting:

  1. to_date() is a helper method to convert year/month/day to a string
  2. gen_features(row) takes a row of inputs and generates a key/value tuple where the key is the date string (output of to_date) and the value is the feature value. We don't use the key in this iteraion, but we will use it in the second iteration to join with the weather data.
  3. the get_hour() method extracts the 2-digit hour portion of the departure time
  4. The days_from_nearest_holiday() method computes the minimum distance (in days) of the provided year/month/date from any holiday in the list holidays.

With DelayRec in place, our processing takes on the following steps (in the function prepFlightDelays):

  1. We read the raw input file with Spark's SparkContext.textFile method, resulting in an RDD
  2. Each row is parsed with CSVReader into fields, and populated into a DelayRec object
  3. We then perform a sequence of RDD transformations on the input RDD to make sure we only have rows that correspond to flights that did not get cancelled and originated from ORD.

Finally, we use the gen_features method to generate the final feature vector per row, as a set of doubles.


In [1]:
import org.apache.spark.rdd._
import scala.collection.JavaConverters._
import au.com.bytecode.opencsv.CSVReader

import java.io._
import org.joda.time._
import org.joda.time.format._

case class DelayRec(year: String,
                    month: String,
                    dayOfMonth: String,
                    dayOfWeek: String,
                    crsDepTime: String,
                    depDelay: String,
                    origin: String,
                    distance: String,
                    cancelled: String) {

    val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007",
      "09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007",
      "01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008",
      "09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008")

    def gen_features: (String, Array[Double]) = {
      val values = Array(
        depDelay.toDouble,
        month.toDouble,
        dayOfMonth.toDouble,
        dayOfWeek.toDouble,
        get_hour(crsDepTime).toDouble,
        distance.toDouble,
        days_from_nearest_holiday(year.toInt, month.toInt, dayOfMonth.toInt)
      )
      new Tuple2(to_date(year.toInt, month.toInt, dayOfMonth.toInt), values)
    }

    def get_hour(depTime: String) : String = "%04d".format(depTime.toInt).take(2)
    def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)

    def days_from_nearest_holiday(year:Int, month:Int, day:Int): Int = {
      val sampleDate = new DateTime(year, month, day, 0, 0)

      holidays.foldLeft(3000) { (r, c) =>
        val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)
        val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)
        math.min(r, distance)
      }
    }
  }

// function to do a preprocessing step for a given file
def prepFlightDelays(infile: String): RDD[DelayRec] = {
    val data = sc.textFile(infile)

    data.map { line =>
      val reader = new CSVReader(new StringReader(line))
      reader.readAll().asScala.toList.map(rec => DelayRec(rec(0),rec(1),rec(2),rec(3),rec(5),rec(15),rec(16),rec(18),rec(21)))
    }.map(list => list(0))
    .filter(rec => rec.year != "Year")
    .filter(rec => rec.cancelled == "0")
    .filter(rec => rec.origin == "ORD")
}

val data_2007 = prepFlightDelays("airline/delay/2007.csv").map(rec => rec.gen_features._2)
val data_2008 = prepFlightDelays("airline/delay/2008.csv").map(rec => rec.gen_features._2)
data_2007.take(5).map(x => x mkString ",").foreach(println)



-8.0,1.0,25.0,4.0,11.0,719.0,10.0
41.0,1.0,28.0,7.0,15.0,925.0,13.0
45.0,1.0,29.0,1.0,20.0,316.0,14.0
-9.0,1.0,17.0,3.0,19.0,719.0,2.0
180.0,1.0,12.0,5.0,17.0,316.0,3.0

Modeling with Spark and ML-Lib

With the data_2007 dataset (which we'll use for training) and the data_2008 dataset (which we'll use for validation) as RDDs, we now build a predictive model using Spark's ML-Lib machine learning library.

ML-Lib is Spark’s scalable machine learning library, which includes various learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and others.

If you compare ML-Lib to Scikit-learn, at the moment ML-Lib lacks a few important algorithms like Random Forest or Gradient Boosted Trees. Having said that, we see a strong pace of innovation from the ML-Lib community and expect more algorithms and other features to be added soon (for example, Random Forest is being actively worked on, and will likely be available in the next release).

To use ML-Lib's machine learning algorithms, first we parse our feature matrices into RDDs of LabeledPoint objects (for both the training and test datasets). LabeledPoint is ML-Lib's abstraction for a feature vector accompanied by a label. We consider flight delays of 15 minutes or more as "delays" and mark it with a label of 1.0, and under 15 minutes as "non-delay" and mark it with a label of 0.0.

We also use ML-Lib's StandardScaler class to normalize our feature values for both training and validation sets. This is important because of ML-Lib's use of Stochastic Gradient Descent, which is known to perform best if feature vectors are normalized.


In [2]:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.feature.StandardScaler

def parseData(vals: Array[Double]): LabeledPoint = {
  LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1)))
}

// Prepare training set
val parsedTrainData = data_2007.map(parseData)
parsedTrainData.cache
val scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features))
val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
scaledTrainData.cache

// Prepare test/validation set
val parsedTestData = data_2008.map(parseData)
parsedTestData.cache
val scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
scaledTestData.cache

scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println)


(0.0,[-1.6160463330366548,1.054927299466599,0.03217026353736381,-0.5189244175441321,0.034083933424313526,-0.2801683099466359])
(1.0,[-1.6160463330366548,1.3961052168540333,1.5354307758475527,0.3624320984120952,0.43165511884343954,-0.023273887437334728])
(1.0,[-1.6160463330366548,1.5098311893165113,-1.4710902487728252,1.4641277433573794,-0.7436888225169864,0.06235758673243232])

Note that we use the RDD cache method to ensure that these computed RDDs (parsedTrainData, scaledTrainData, parsedTestData and scaledTestData) are cached in memory by Spark and not re-computed with each iteration of stochastic gradient descent.

We also define a helper function to evaluate our metrics: precision, recall, accuracy and the F1-measure


In [3]:
// Function to compute evaluation metrics
def eval_metrics(labelsAndPreds: RDD[(Double, Double)]) : Tuple2[Array[Double], Array[Double]] = {
    val tp = labelsAndPreds.filter(r => r._1==1 && r._2==1).count.toDouble
    val tn = labelsAndPreds.filter(r => r._1==0 && r._2==0).count.toDouble
    val fp = labelsAndPreds.filter(r => r._1==1 && r._2==0).count.toDouble
    val fn = labelsAndPreds.filter(r => r._1==0 && r._2==1).count.toDouble

    val precision = tp / (tp+fp)
    val recall = tp / (tp+fn)
    val F_measure = 2*precision*recall / (precision+recall)
    val accuracy = (tp+tn) / (tp+tn+fp+fn)
    new Tuple2(Array(tp, tn, fp, fn), Array(precision, recall, F_measure, accuracy))
}




ML-Lib supports a few algorithms for supervised learning, among those are Linear Regression, Logistic Regression, Naive Bayes, Decision Tree and Linear SVM. We will use Logistic Regression and SVM, both of which are implemented using Stochastic Gradient descent (SGD).

Let's see how to build these models with ML-Lib:


In [4]:
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

// Build the Logistic Regression model
val model_lr = LogisticRegressionWithSGD.train(scaledTrainData, numIterations=100)

// Predict
val labelsAndPreds_lr = scaledTestData.map { point =>
    val pred = model_lr.predict(point.features)
    (pred, point.label)
}
val m_lr = eval_metrics(labelsAndPreds_lr)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_lr(0), m_lr(1), m_lr(2), m_lr(3)))


precision = 0.37, recall = 0.64, F1 = 0.47, accuracy = 0.59

We have built a model using Logistic Regression with SGD using 100 iterations, and then used it to predict flight delays over the validation set to measure performance: precision, recall, F1 and accuracy.

Next, let's try a Support Vector Machine model:


In [5]:
import org.apache.spark.mllib.classification.SVMWithSGD

// Build the SVM model
val svmAlg = new SVMWithSGD()
svmAlg.optimizer.setNumIterations(100)
                .setRegParam(1.0)
                .setStepSize(1.0)
val model_svm = svmAlg.run(scaledTrainData)

// Predict
val labelsAndPreds_svm = scaledTestData.map { point =>
        val pred = model_svm.predict(point.features)
        (pred, point.label)
}
val m_svm = eval_metrics(labelsAndPreds_svm)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3)))


precision = 0.37, recall = 0.64, F1 = 0.47, accuracy = 0.59

Since ML-Lib also has a strong Decision Tree implementation, let's use it here:


In [6]:
import org.apache.spark.mllib.tree.DecisionTree

// Build the Decision Tree model
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 10
val maxBins = 100
val model_dt = DecisionTree.trainClassifier(parsedTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)

// Predict
val labelsAndPreds_dt = parsedTestData.map { point =>
    val pred = model_dt.predict(point.features)
    (pred, point.label)
}
val m_dt = eval_metrics(labelsAndPreds_dt)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0), m_dt(1), m_dt(2), m_dt(3)))


precision = 0.41, recall = 0.24, F1 = 0.31, accuracy = 0.69

Note that overall accuracy is higher with the Decision Tree, but the precision and recall trade-off is different with higher precision and much lower recall.

Building a richer model with flight delays, weather data using Apache Spark and ML-Lib

Similar to our approach in the first part of this blog post, we now enrich the dataset by integrating weather data into our feature matrix, thus achieving better predictive performance overall for our model.

To accomplish this with Apache Spark, we rewrite our previous preprocess_spark function to extract the same base features from the flight delay dataset, and also join those with five variables from the weather datasets: minimum and maximum temperature for the day, precipitation, snow and wind speed. Let's see how this is accomplished.


In [6]:
import org.apache.spark.SparkContext._
import scala.collection.JavaConverters._
import au.com.bytecode.opencsv.CSVReader
import java.io._

// function to do a preprocessing step for a given file

def preprocess_spark(delay_file: String, weather_file: String): RDD[Array[Double]] = { 
  // Read wether data
  val delayRecs = prepFlightDelays(delay_file).map{ rec => 
        val features = rec.gen_features
        (features._1, features._2)
  }

  // Read weather data into RDDs
  val station_inx = 0
  val date_inx = 1
  val metric_inx = 2
  val value_inx = 3

  def filterMap(wdata:RDD[Array[String]], metric:String):RDD[(String,Double)] = {
    wdata.filter(vals => vals(metric_inx) == metric).map(vals => (vals(date_inx), vals(value_inx).toDouble))
  }

  val wdata = sc.textFile(weather_file).map(line => line.split(","))
                    .filter(vals => vals(station_inx) == "USW00094846")
  val w_tmin = filterMap(wdata,"TMIN")
  val w_tmax = filterMap(wdata,"TMAX")
  val w_prcp = filterMap(wdata,"PRCP")
  val w_snow = filterMap(wdata,"SNOW")
  val w_awnd = filterMap(wdata,"AWND")

  delayRecs.join(w_tmin).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2)))
           .join(w_tmax).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2)))
           .join(w_prcp).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2)))
           .join(w_snow).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2)))
           .join(w_awnd).map(vals => vals._2._1 ++ Array(vals._2._2))
}

val data_2007 = preprocess_spark("airline/delay/2007.csv", "airline/weather/2007.csv")
val data_2008 = preprocess_spark("airline/delay/2008.csv", "airline/weather/2008.csv")

data_2007.take(5).map(x => x mkString ",").foreach(println)


63.0,2.0,14.0,3.0,15.0,316.0,5.0,-139.0,-61.0,8.0,36.0,53.0
0.0,2.0,14.0,3.0,12.0,925.0,5.0,-139.0,-61.0,8.0,36.0,53.0
105.0,2.0,14.0,3.0,17.0,316.0,5.0,-139.0,-61.0,8.0,36.0,53.0
36.0,2.0,14.0,3.0,19.0,719.0,5.0,-139.0,-61.0,8.0,36.0,53.0
35.0,2.0,14.0,3.0,18.0,719.0,5.0,-139.0,-61.0,8.0,36.0,53.0

Note that the minimum and maximum temparature variables from the weather dataset are measured here in Celsius and multiplied by 10. So for example -139.0 would translate into -13.9 Celsius.

Modeling with weather data

We are going to repeat the previous models of SVM and Decision Tree with our enriched feature set. As before, we create an RDD of LabeledPoint objects, and normalize our dataset with ML-Lib's StandardScaler:


In [8]:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.feature.StandardScaler

def parseData(vals: Array[Double]): LabeledPoint = {
  LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1)))
}

// Prepare training set
val parsedTrainData = data_2007.map(parseData)
val scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features))
val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
parsedTrainData.cache
scaledTrainData.cache

// Prepare test/validation set
val parsedTestData = data_2008.map(parseData)
val scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
parsedTestData.cache
scaledTestData.cache

scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println)


(1.0,[-1.3223160050229035,-0.19605839762065824,-0.4689165738993619,0.362432098412094,-0.7436888225169838,-0.7083256807954716,-1.8498937310721373,-1.7543558972509141,-0.24059125907894596,2.6196648266835743,0.5456137506493843])
(0.0,[-1.3223160050229035,-0.19605839762065824,-0.4689165738993619,-0.2985852885550763,0.4316551188434456,-0.7083256807954716,-1.8498937310721373,-1.7543558972509141,-0.24059125907894596,2.6196648266835743,0.5456137506493843])
(1.0,[-1.3223160050229035,-0.19605839762065824,-0.4689165738993619,0.8031103563902076,-0.7436888225169838,-0.7083256807954716,-1.8498937310721373,-1.7543558972509141,-0.24059125907894596,2.6196648266835743,0.5456137506493843])

Next, let's build a Support Vector Machine model using this enriched feature matrix:


In [9]:
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.optimization.L1Updater

// Build the SVM model
val svmAlg = new SVMWithSGD()
svmAlg.optimizer.setNumIterations(100)
                .setRegParam(1.0)
                .setStepSize(1.0)
val model_svm = svmAlg.run(scaledTrainData)

// Predict
val labelsAndPreds_svm = scaledTestData.map { point =>
        val pred = model_svm.predict(point.features)
        (pred, point.label)
}
val m_svm = eval_metrics(labelsAndPreds_svm)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3)))


precision = 0.39, recall = 0.68, F1 = 0.49, accuracy = 0.61

And finally, let's implement a Decision Tree model:


In [10]:
import org.apache.spark.mllib.tree.DecisionTree

// Build the Decision Tree model
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 10
val maxBins = 100
val model_dt = DecisionTree.trainClassifier(scaledTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)

// Predict
val labelsAndPreds_dt = scaledTestData.map { point =>
    val pred = model_dt.predict(point.features)
    (pred, point.label)
}
val m_dt = eval_metrics(labelsAndPreds_dt)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0), m_dt(1), m_dt(2), m_dt(3)))


precision = 0.52, recall = 0.35, F1 = 0.42, accuracy = 0.72

As expected, the improved feature set increased the accuracy of our model for both SVM and Decision Tree models.

Summary

In this IPython notebook we have demonstrated how to build a predictive model in Scala with Apache Hadoop, Apache Spark and its machine learning library: ML-Lib.

We have used Apache Spark on our HDP cluster to perform various types of data pre-processing and feature engineering tasks. We then applied a few ML-Lib machine learning algorithms such as Support Vector Machines and Decision Tree to the resulting datasets and showed how through iterations we continuously add new and improved features resulting in better model performance.

In the next part of this demo series we will show how to perform the same learning task with R.