Data Science with Hadoop - Predicting airline delays - part 3: Scalding and R

Introduction

In this 3rd part of the blog post on data science, we continue to demonstrate how to build a predictive model with Hadoop, highlighting various tools that can be effectively used in this process. This time we'll use Scalding for pre-processing and R for modeling.

R is a language and environment for statistical computing and graphics. It is a GNU project which was developed at Bell Laboratories by John Chambers and his colleagues. R is an open source project with more than 6000 packages available covering various topics in data science including classification, regression, clustering, anomaly detection, market basket analysis, text processing and many others. R is an extremely powerful and mature environment for statistical analysis and data science.

Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs using higher level abstractions of a data pipeline. Scalding is built on top of Cascading, a Java library that abstracts away low-level Hadoop details. Scalding is comparable to Pig, but offers tight integration with Scala, bringing advantages of Scala to your MapReduce jobs.

Recall from the first blog post that we are constructing a predictive model for flight delays. Our source dataset resides here: http://stat-computing.org/dataexpo/2009/the-data.html, and includes details about flights in the US from the years 1987-2008. We will also enrich the data with weather information from: http://www.ncdc.noaa.gov/cdo-web/datasets/, where we find daily temperatures (min/max), wind speed, snow conditions and precipitation.

We will build a supervised learning model to predict flight delays for flights leaving O'Hare International airport (ORD), using the year 2007 data to build the model, and 2008 data to test its validity.

Data Exploration

R is a fantastic environment for data exploration and is often used for this purpoase. With a ton of statistical and data manipulation functionality being part of core R, as well as powerful graphics packages such as ggplot, performing data exploration in R is easy and fun.

Let's first enable R cells in IPython:


In [1]:
%%capture
%load_ext rpy2.ipython

Before we start our data exploration example, we load various packages we need for later. We will use the RHDFS package from RHadoop to read files from HDFS; however we need the ability to read a multi-part file from HDFS into R as a single data frame, so we define the read_csv_from_hdfs function in R:


In [2]:
%%capture
%%R

# load required R packages
require(rhdfs)
require(randomForest)
require(gbm)
require(plyr)
require(data.table)

# Initialize RHDFS package
hdfs.init(hadoop='/usr/bin/hadoop')

# Utility function to read a multi-part file from HDFS into an R data frame
read_csv_from_hdfs <- function(filename, cols=NULL) {
  dir.list = hdfs.ls(filename)
  list.condition <- sapply(dir.list$size, function(x) x > 0)
  file.list  <- dir.list[list.condition,]
  tables <- lapply(file.list$file, function(f) {
    content <- paste(hdfs.read.text.file(f, n = 100000L, buffer=100000000L), collapse='\n')
    if (length(cols)==0) {
      dt = fread(content, sep=",", colClasses="character", stringsAsFactors=F, header=T)   
    } else {
      dt = fread(content, sep=",", colClasses="character", stringsAsFactors=F, header=F)   
      setnames(dt, names(dt), cols)    
    }
    dt
  })
  rbind.fill(tables)
}

We now explore the 2007 delay dataset to determine which variables are reasonable to use for this prediction task. First let's load the data into an R dataframe:


In [3]:
%%R
cols = c('year', 'month', 'day', 'dow', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime','Carrier', 'FlightNum', 'TailNum', 
         'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 
         'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 
         'LateAircraftDelay');
flt_2007 = read_csv_from_hdfs('/user/demo/airline/delay/2007.csv', cols)

print(dim(flt_2007))


Read 0.0% of 7453216 rows
Read 5.0% of 7453216 rows
Read 9.9% of 7453216 rows
Read 14.9% of 7453216 rows
Read 19.9% of 7453216 rows
Read 24.8% of 7453216 rows
Read 29.8% of 7453216 rows
Read 34.8% of 7453216 rows
Read 39.6% of 7453216 rows
Read 44.4% of 7453216 rows
Read 49.2% of 7453216 rows
Read 54.1% of 7453216 rows
Read 58.9% of 7453216 rows
Read 63.9% of 7453216 rows
Read 68.8% of 7453216 rows
Read 73.8% of 7453216 rows
Read 78.6% of 7453216 rows
Read 83.6% of 7453216 rows
Read 88.4% of 7453216 rows
Read 93.4% of 7453216 rows
Read 98.3% of 7453216 rows
Read 7453216 rows and 29 (of 29) columns from 0.655 GB file in 00:00:32
[1] 7453216      29

We have 7.4M+ flights in 2007 and 29 variables.

Our "target" variable will be DepDelay (departure delay in minutes). To build a classifier, we define a derived target variable by defining a "delay" as having 15 mins or more of delay, and "non-delay" otherwise. We thus create a new binary variable that we name 'DepDelayed'.

Let's look at some basic statistics of flights and delays (per our new definition), after limiting ourselves to flights originating from ORD:


In [4]:
%%R
df1 = flt_2007[which(flt_2007$Origin == 'ORD' & !is.na(flt_2007$DepDelay)),]
df1$DepDelay = sapply(df1$DepDelay, function(x) (if (as.numeric(x)>=15) 1 else 0))

print(paste0("total flights: ", as.character(dim(df1)[1])))
print(paste0("total delays: ", as.character(sum(df1$DepDelay))))


[1] "total flights: 359169"
[1] "total delays: 109346"

The "month" feature is likely a good feature for modeling -- let's look at the distribution of delays (as percentage of total flights) by month:


In [5]:
%%R
df2 = df1[, c('DepDelay', 'month'), with=F]
df2$month = as.numeric(df2$month)
df2 <- ddply(df2, .(month), summarise, mean_delay=mean(DepDelay))
barplot(df2$mean_delay, names.arg=df2$month, xlab="month", ylab="% of delays", col="blue")


Similarly, one would expect that "hour of day" would be a good feature to predict flight delays, as later flights in the day may present more delays due to density effects. Let's look at that:


In [6]:
%%R

# Extract hour of day from 3 or 4 digit time-of-day string
get_hour <- function(x) { 
  s = sprintf("%04d", as.numeric(x))
  return(substr(s, 1, 2))
}

df2 = df1[, c('DepDelay', 'CRSDepTime'), with=F]
df2$hour = as.numeric(sapply(df2$CRSDepTime, get_hour))
df2$CRSDepTime <- NULL
df2 <- ddply(df2, .(hour), summarise, mean_delay=mean(DepDelay))
barplot(df2$mean_delay, names.arg=df2$hour, xlab="hour of day", ylab="% of delays", col="green")


In this demo we have not explored all the variables of course, just a couple to demonstrate R's capabilities in this area.

Pre-processing - iteration 1

After playing around with the data, and exploring some potential features -- we will now demonstrate how to use Scalding to preform some simple pre-processing on Hadoop, and create a feature matrix from the raw dataset. This is similar to the pre-processing shown in part 1 of the blog post (where we've used PIG for this purpose) and part 2 (where we've used Spark for this purpose).

In our first iteration, we create the following features:

  • month: winter months should have more delays than summer months
  • day of month: this is likely not a very predictive variable, but let's keep it in anyway
  • day of week: weekend vs. weekday
  • hour of the day: later hours tend to have more delays
  • Distance: interesting to see if this variable is a good predictor of delay
  • days_from_closest_holiday: number of days from date of flight to closest US holiday

Let's look at the Scalding code. Note that we write the code in the next cell using IPython's "writefile" magic command and then execute it later from that local file.


In [8]:
%%writefile preprocess1.scala

package com.hortonworks.datascience.demo1

import com.twitter.scalding._
import org.joda.time.format._
import org.joda.time.{Days, DateTime}
import com.hortonworks.datascience.demo1.ScaldingFlightDelays._

/**
 * Pre-process flight delay data into feature matrix - iteration #1
 */
class ScaldingFlightDelays(args: Args) extends Job(args) {

  val prepData = Csv(args("input"), ",", fields = inputSchema, skipHeader = true)
    .read
    .project(delaySchmea)
    .filter(('Origin,'Cancelled)) { x:(String,String) => x._1 == "ORD" && x._2 == "0"}
    .mapTo(featureSchmea -> outputSchema)(gen_features)
    .write(Csv(args("output")))
}

object ScaldingFlightDelays {
  val inputSchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 
                    'DepTime, 'CRSDepTime, 'ArrTime, 'CRSArrTime, 
                    'UniqueCarrier, 'FlightNum, 'TailNum, 
                    'ActualElapsedTime, 'CRSElapsedTime, 'AirTime, 'ArrDelay, 
                    'DepDelay, 'Origin, 'Dest, 'Distance, 
                    'TaxiIn, 'TaxiOut, 'Cancelled, 'CancellationCode, 
                    'Diverted, 'CarrierDelay, 'WeatherDelay, 
                    'NASDelay, 'SecurityDelay, 'LateAircraftDelay)
  val delaySchmea = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 
                         'CRSDepTime, 'DepDelay, 'Origin, 'Distance, 'Cancelled)
  val featureSchmea = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 
                           'CRSDepTime, 'DepDelay, 'Distance)
  val outputSchema = List('flightDate,'y,'m,'dm,'dw,'crs,'dep,'dist)

  val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007",
    "09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007",
    "01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008",
    "09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008")

  def gen_features(tuple: (String,String,String,String,String,String,String)) = {
    val (year, month, dayOfMonth, dayOfWeek, crsDepTime, depDelay, distance) = tuple
    val date = to_date(year.toInt,month.toInt,dayOfMonth.toInt)
    val hour = get_hour(crsDepTime)
    val holidayDist = days_from_nearest_holiday(year.toInt,month.toInt,dayOfMonth.toInt)

    (date,depDelay,month,dayOfMonth,dayOfWeek,hour,distance,holidayDist.toString)
  }

  def get_hour(depTime: String) = "%04d".format(depTime.toInt).take(2)

  def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)

  def days_from_nearest_holiday(year:Int, month:Int, day:Int) = {
    val sampleDate = new DateTime(year, month, day, 0, 0)

    holidays.foldLeft(3000) { (r, c) =>
      val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)
      val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)
      math.min(r, distance)
    }
  }
}


Overwriting preprocess1.scala

We execute this Scalding code using the standard "scald.rb" script, generating the feature matrix for both the 2007 dataset and 2008 dataset. Note we use IPython's "%%capture" magic command to capture the output of the Scalding script and print only error messages, if any (stderr)


In [1]:
%%capture capt2007_1
!/home/demo/scalding/scripts/scald.rb --hdfs preprocess1.scala --input "airline/delay/2007.csv" --output "airline/fm/ord_2007_sc_1"

In [6]:
capt2007_1.stderr


Out[6]:
''

In [3]:
%%capture capt2008_1
!/home/demo/scalding/scripts/scald.rb --hdfs preprocess1.scala --input "airline/delay/2008.csv" --output "airline/fm/ord_2008_sc_1"

In [4]:
capt2008_1.stderr


Out[4]:
''

Modeling - iteration 1

Now that we've generated our feature matrix using Scalding and Hadoop, let's turn to using R to build a predictive model for predicting airline delays. First we prepare our trainning set (using the 2007 data) and test set (using 2008 data):


In [11]:
%%R

# Function to compute Precision, Recall and F1-Measure
get_metrics <- function(predicted, actual) {
  tp = length(which(predicted == TRUE & actual == TRUE))
  tn = length(which(predicted == FALSE & actual == FALSE))
  fp = length(which(predicted == TRUE & actual == FALSE))
  fn = length(which(predicted == FALSE & actual == TRUE))

  precision = tp / (tp+fp)
  recall = tp / (tp+fn)
  F1 = 2*precision*recall / (precision+recall)
  accuracy = (tp+tn) / (tp+tn+fp+fn)
  
  v = c(precision, recall, F1, accuracy)
  v
}

# Read input files
process_dataset <- function(filename) {
  cols = c('date', 'delay', 'month', 'day', 'dow', 'hour', 'distance', 'days_from_holiday')
    
  data = read_csv_from_hdfs(filename, cols)
  data$delay = as.factor(as.numeric(data$delay) >= 15)
  data$month = as.factor(data$month)
  data$day = as.factor(data$day)
  data$dow = as.factor(data$dow)
  data$hour = as.numeric(data$hour)
  data$distance = as.numeric(data$distance)
  data$days_from_holiday = as.numeric(data$days_from_holiday)
  data
}

# Prepare training set and test/validation set

data_2007 = process_dataset('/user/demo/airline/fm/ord_2007_sc_1')
data_2008 = process_dataset('/user/demo/airline/fm/ord_2008_sc_1')

fcols = setdiff(names(data_2007), c('date', 'delay'))
train_x = data_2007[,fcols, with=FALSE]
train_y = data_2007$delay

test_x = data_2008[,fcols, with=FALSE]
test_y = data_2008$delay

The preprocess_data function reads the data from HDFS into an R data frame. We use it first to read the feature matrix based on 2007 into the data_2007 R dataframe (used as a training set), and then similarly to build data_2008 as the testing set.

In this cell, we also define a helper function get_metrics that we will use later to measure precision, recall, F1 and accuracy.

Now let's run R's random forest algorithm and evaluate the results:


In [12]:
%%R

rf.model = randomForest(train_x, train_y, ntree=40)
rf.pr <- predict(rf.model, newdata=test_x)
m.rf = get_metrics(as.logical(rf.pr), as.logical(test_y))
print(sprintf("Random Forest: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f", m.rf[1], m.rf[2], m.rf[3], m.rf[4]))


[1] "Random Forest: precision=0.43, recall=0.30, F1=0.35, accuracy=0.69"

Let's also try R's Gradient Boosted Machines (GBM) modeling. GBM is an ensemble method that like random forest is typically robust to over-fitting.


In [13]:
%%R

gbm.model <- gbm.fit(train_x, as.numeric(train_y)-1, n.trees=500, verbose=F, shrinkage=0.01, distribution="bernoulli", 
                     interaction.depth=3, n.minobsinnode=30)
gbm.pr <- predict(gbm.model, newdata=test_x, n.trees=500, type="response")
m.gbm = get_metrics(gbm.pr >= 0.5, as.logical(test_y))
print(sprintf("Gradient Boosted Machines: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f", m.gbm[1], m.gbm[2], m.gbm[3], m.gbm[4]))


[1] "Gradient Boosted Machines: precision=0.53, recall=0.10, F1=0.17, accuracy=0.72"

Using Random Foresta and Gradient Boosted Machines we get pretty good results for our predictive model using a simple set of features. Following the same iterative model from the previous parts of this blog post, we now use additional data sources to enrich our core dataset and create new features that would help us achieve better predictive performance.

Pre-processing - iteration 2

As we have demonstrated in part 1 and 2 of this blog post, a way to improve accuracy for this model is to layer-in weather data and with it add more informative features to our model. We can get this data from a publicly available dataset here: http://www.ncdc.noaa.gov/cdo-web/datasets//

We now add these additional weather-related features to our model: daily temperatures (min/max), wind speed, snow conditions and precipitation in the flight origin airport (ORD).

So let's re-write our Scalding program to add these new features to our feature matrix:


In [14]:
%%writefile preprocess2.scala

package com.hortonworks.datascience.demo2

import com.twitter.scalding._
import org.joda.time.format._
import org.joda.time.{Days, DateTime}
import com.hortonworks.datascience.demo2.ScaldingFlightDelays._

/**
 * pre-process flight and weather data into feature matrix - iteration #2
 */
class ScaldingFlightDelays(args: Args) extends Job(args) {

  val delayData = Csv(args("delay"), ",", fields = delayInSchema, skipHeader = true)
    .read
    .project(delaySchema)
    .filter(('Origin,'Cancelled)) { x:(String,String) => x._1 == "ORD" && x._2 == "0"}
    .mapTo(filterSchema -> featureSchmea)(gen_features)

  val weatherData = Csv(args("weather"),",", fields = weatherInSchema)
    .read
    .project(weatherSchema)
    .filter('Station){x:String => x == "USW00094846"}
    .filter('Metric){m:String => m == "TMIN" | m == "TMAX" | m == "PRCP" | m == "SNOW" | m == "AWND"}
    .mapTo(weatherSchema -> ('Date,'MM)){tuple:(String,String,String,String) => (tuple._2,tuple._3+":"+tuple._4)}
    .groupBy('Date){_.foldLeft('MM -> 'Measures)(Map[String,Double]()){
        (m,s:String) => {val kv = s.split(":"); m + (kv(0) -> kv(1).toDouble)}
      }
    }

  delayData.joinWithSmaller(('flightDate,'Date),weatherData)
    .project('delay,'m,'dm,'dw,'h,'dist,'holiday,'Measures)
    .mapTo(joinSchema -> outputSchema){x:(Double,Double,Double,Double,Double,Double,Double,Map[String,Double]) => {
        (x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8("TMIN"), x._8("TMAX"), x._8("PRCP"), x._8("SNOW"), x._8("AWND"))
      }
    }
    .write(Csv(args("output"),","))
}

object ScaldingFlightDelays {
  val delayInSchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek,
                           'DepTime, 'CRSDepTime, 'ArrTime, 'CRSArrTime,
                           'UniqueCarrier, 'FlightNum, 'TailNum,
                           'ActualElapsedTime, 'CRSElapsedTime, 'AirTime,
                           'ArrDelay, 'DepDelay, 'Origin, 'Dest,
                           'Distance, 'TaxiIn, 'TaxiOut,
                           'Cancelled, 'CancellationCode, 'Diverted,
                           'CarrierDelay, 'WeatherDelay, 'NASDelay,
                           'SecurityDelay, 'LateAircraftDelay)
  val weatherInSchema = List('Station, 'Date, 'Metric, 'Measure, 'v1, 'v2, 'v3, 'v4)
  val delaySchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek,
                         'CRSDepTime, 'DepDelay, 'Origin, 'Distance, 'Cancelled);
  val weatherSchema = List('Station, 'Date, 'Metric, 'Measure)
  val filterSchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 'CRSDepTime, 'DepDelay, 'Distance)
  val featureSchmea = List('flightDate,'delay,'m,'dm,'dw,'h,'dist,'holiday);
  val joinSchema = List('delay,'m,'dm,'dw,'h,'dist,'holiday,'Measures)
  val outputSchema = List('delay,'m,'dm,'dw,'h,'dist,'holiday,'tmin,'tmax,'prcp,'snow,'awnd)

  val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007",
    "09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007",
    "01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008",
    "09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008")

  def gen_features(tuple: (String,String,String,String,String,String,String)) = {
    val (year, month,dayOfMonth,dayOfWeek:String, crsDepTime:String,depDelay:String,distance:String) = tuple
    val date = to_date(year.toInt,month.toInt,dayOfMonth.toInt)
    val hour = get_hour(crsDepTime)
    val holidayDist = days_from_nearest_holiday(year.toInt,month.toInt,dayOfMonth.toInt)

    (date,depDelay,month,dayOfMonth,dayOfWeek,hour,distance,holidayDist.toString)
  }

  def get_hour(depTime: String) = "%04d".format(depTime.toInt).take(2)

  def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)

  def days_from_nearest_holiday(year:Int, month:Int, day:Int) = {
    val sampleDate = new DateTime(year, month, day, 0, 0)

    holidays.foldLeft(3000) { (r, c) =>
      val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)
      val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)
      math.min(r, distance)
    }
  }
}


Overwriting preprocess2.scala

In [9]:
%%capture capt2007_2
!/home/demo/scalding/scripts/scald.rb --hdfs preprocess2.scala --delay "airline/delay/2007.csv" --weather "airline/weather/2007.csv" --output "airline/fm/ord_2007_sc_2"

In [10]:
capt2007_2.stderr


Out[10]:
''

In [11]:
%%capture capt2008_2
!/home/demo/scalding/scripts/scald.rb --hdfs preprocess2.scala --delay "airline/delay/2008.csv" --weather "airline/weather/2008.csv" --output "airline/fm/ord_2008_sc_2"

In [12]:
capt2008_2.stderr


Out[12]:
''

Modeling - iteration 2

Now let's re-build the Random Forest and Gradient Boosted Tree models with the enahanced feature matrices. As before, we first prepare our training set and test set:


In [7]:
%%R

# Read input files
process_dataset <- function(filename) {
  cols = c('delay', 'month', 'day', 'dow', 'hour', 'distance', 'days_from_holiday', 
           'tmin', 'tmax', 'prcp', 'snow', 'awnd')
    
  data = read_csv_from_hdfs(filename, cols)
  data$delay = as.factor(as.numeric(data$delay) >= 15)
  data$month = as.factor(data$month)
  data$day = as.factor(data$day)
  data$dow = as.factor(data$dow)
  data$hour = as.numeric(data$hour)
  data$distance = as.numeric(data$distance)
  data$days_from_holiday = as.numeric(data$days_from_holiday)
  data$tmin = as.numeric(data$tmin)
  data$tmax = as.numeric(data$tmax)
  data$prcp = as.numeric(data$prcp)
  data$snow = as.numeric(data$snow)
  data$awnd = as.numeric(data$awnd)
  data
}

# Prepare training set and test/validation set

data_2007 = process_dataset('/user/demo/airline/fm/ord_2007_sc_2')
data_2008 = process_dataset('/user/demo/airline/fm/ord_2008_sc_2')

fcols = setdiff(names(data_2007), c('delay'))
train_x = data_2007[,fcols]
train_y = data_2007$delay

test_x = data_2008[,fcols]
test_y = data_2008$delay

And now to build a Random Forest model:


In [8]:
%%R

rf.model = randomForest(train_x, train_y, ntree=40)
rf.pr <- predict(rf.model, newdata=test_x)
m.rf = get_metrics(as.logical(rf.pr), as.logical(test_y))
print(sprintf("Random Forest: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f", 
              m.rf[1], m.rf[2], m.rf[3], m.rf[4]))


[1] "Random Forest: precision=0.58, recall=0.38, F1=0.45, accuracy=0.74"

And the gradient boosted tree model:


In [9]:
%%R

gbm.model <- gbm.fit(train_x, as.numeric(train_y)-1, n.trees=500, verbose=F, shrinkage=0.01, distribution="bernoulli", 
                     interaction.depth=3, n.minobsinnode=30)
gbm.pr <- predict(gbm.model, newdata=test_x, n.trees=500, type="response")
m.gbm = get_metrics(gbm.pr >= 0.5, as.logical(test_y))
print(sprintf("Gradient Boosted Machines: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f", 
              m.gbm[1], m.gbm[2], m.gbm[3], m.gbm[4]))


[1] "Gradient Boosted Machines: precision=0.63, recall=0.27, F1=0.38, accuracy=0.75"

Summary

In this 3rd part of our blog post we used an IPython notebook to demonstrate how to build a predictive model for airline delays. We have used Scalding on our HDP cluster to perform various types of data pre-processing and feature engineering tasks. We then applied a few machine learning algorithms such as random forest and gradient boosted machines to the resulting datasets and showed how through iterations we add new features resulting in better model performance.