In this 3rd part of the blog post on data science, we continue to demonstrate how to build a predictive model with Hadoop, highlighting various tools that can be effectively used in this process. This time we'll use Scalding for pre-processing and R for modeling.
R is a language and environment for statistical computing and graphics. It is a GNU project which was developed at Bell Laboratories by John Chambers and his colleagues. R is an open source project with more than 6000 packages available covering various topics in data science including classification, regression, clustering, anomaly detection, market basket analysis, text processing and many others. R is an extremely powerful and mature environment for statistical analysis and data science.
Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs using higher level abstractions of a data pipeline. Scalding is built on top of Cascading, a Java library that abstracts away low-level Hadoop details. Scalding is comparable to Pig, but offers tight integration with Scala, bringing advantages of Scala to your MapReduce jobs.
Recall from the first blog post that we are constructing a predictive model for flight delays. Our source dataset resides here: http://stat-computing.org/dataexpo/2009/the-data.html, and includes details about flights in the US from the years 1987-2008. We will also enrich the data with weather information from: http://www.ncdc.noaa.gov/cdo-web/datasets/, where we find daily temperatures (min/max), wind speed, snow conditions and precipitation.
We will build a supervised learning model to predict flight delays for flights leaving O'Hare International airport (ORD), using the year 2007 data to build the model, and 2008 data to test its validity.
R is a fantastic environment for data exploration and is often used for this purpoase. With a ton of statistical and data manipulation functionality being part of core R, as well as powerful graphics packages such as ggplot, performing data exploration in R is easy and fun.
Let's first enable R cells in IPython:
In [1]:
%%capture
%load_ext rpy2.ipython
Before we start our data exploration example, we load various packages we need for later. We will use the RHDFS package from RHadoop to read files from HDFS; however we need the ability to read a multi-part file from HDFS into R as a single data frame, so we define the read_csv_from_hdfs function in R:
In [2]:
%%capture
%%R
# load required R packages
require(rhdfs)
require(randomForest)
require(gbm)
require(plyr)
require(data.table)
# Initialize RHDFS package
hdfs.init(hadoop='/usr/bin/hadoop')
# Utility function to read a multi-part file from HDFS into an R data frame
read_csv_from_hdfs <- function(filename, cols=NULL) {
dir.list = hdfs.ls(filename)
list.condition <- sapply(dir.list$size, function(x) x > 0)
file.list <- dir.list[list.condition,]
tables <- lapply(file.list$file, function(f) {
content <- paste(hdfs.read.text.file(f, n = 100000L, buffer=100000000L), collapse='\n')
if (length(cols)==0) {
dt = fread(content, sep=",", colClasses="character", stringsAsFactors=F, header=T)
} else {
dt = fread(content, sep=",", colClasses="character", stringsAsFactors=F, header=F)
setnames(dt, names(dt), cols)
}
dt
})
rbind.fill(tables)
}
We now explore the 2007 delay dataset to determine which variables are reasonable to use for this prediction task. First let's load the data into an R dataframe:
In [3]:
%%R
cols = c('year', 'month', 'day', 'dow', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime','Carrier', 'FlightNum', 'TailNum',
'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn',
'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay',
'LateAircraftDelay');
flt_2007 = read_csv_from_hdfs('/user/demo/airline/delay/2007.csv', cols)
print(dim(flt_2007))
We have 7.4M+ flights in 2007 and 29 variables.
Our "target" variable will be DepDelay (departure delay in minutes). To build a classifier, we define a derived target variable by defining a "delay" as having 15 mins or more of delay, and "non-delay" otherwise. We thus create a new binary variable that we name 'DepDelayed'.
Let's look at some basic statistics of flights and delays (per our new definition), after limiting ourselves to flights originating from ORD:
In [4]:
%%R
df1 = flt_2007[which(flt_2007$Origin == 'ORD' & !is.na(flt_2007$DepDelay)),]
df1$DepDelay = sapply(df1$DepDelay, function(x) (if (as.numeric(x)>=15) 1 else 0))
print(paste0("total flights: ", as.character(dim(df1)[1])))
print(paste0("total delays: ", as.character(sum(df1$DepDelay))))
The "month" feature is likely a good feature for modeling -- let's look at the distribution of delays (as percentage of total flights) by month:
In [5]:
%%R
df2 = df1[, c('DepDelay', 'month'), with=F]
df2$month = as.numeric(df2$month)
df2 <- ddply(df2, .(month), summarise, mean_delay=mean(DepDelay))
barplot(df2$mean_delay, names.arg=df2$month, xlab="month", ylab="% of delays", col="blue")
Similarly, one would expect that "hour of day" would be a good feature to predict flight delays, as later flights in the day may present more delays due to density effects. Let's look at that:
In [6]:
%%R
# Extract hour of day from 3 or 4 digit time-of-day string
get_hour <- function(x) {
s = sprintf("%04d", as.numeric(x))
return(substr(s, 1, 2))
}
df2 = df1[, c('DepDelay', 'CRSDepTime'), with=F]
df2$hour = as.numeric(sapply(df2$CRSDepTime, get_hour))
df2$CRSDepTime <- NULL
df2 <- ddply(df2, .(hour), summarise, mean_delay=mean(DepDelay))
barplot(df2$mean_delay, names.arg=df2$hour, xlab="hour of day", ylab="% of delays", col="green")
In this demo we have not explored all the variables of course, just a couple to demonstrate R's capabilities in this area.
After playing around with the data, and exploring some potential features -- we will now demonstrate how to use Scalding to preform some simple pre-processing on Hadoop, and create a feature matrix from the raw dataset. This is similar to the pre-processing shown in part 1 of the blog post (where we've used PIG for this purpose) and part 2 (where we've used Spark for this purpose).
In our first iteration, we create the following features:
Let's look at the Scalding code. Note that we write the code in the next cell using IPython's "writefile" magic command and then execute it later from that local file.
In [8]:
%%writefile preprocess1.scala
package com.hortonworks.datascience.demo1
import com.twitter.scalding._
import org.joda.time.format._
import org.joda.time.{Days, DateTime}
import com.hortonworks.datascience.demo1.ScaldingFlightDelays._
/**
* Pre-process flight delay data into feature matrix - iteration #1
*/
class ScaldingFlightDelays(args: Args) extends Job(args) {
val prepData = Csv(args("input"), ",", fields = inputSchema, skipHeader = true)
.read
.project(delaySchmea)
.filter(('Origin,'Cancelled)) { x:(String,String) => x._1 == "ORD" && x._2 == "0"}
.mapTo(featureSchmea -> outputSchema)(gen_features)
.write(Csv(args("output")))
}
object ScaldingFlightDelays {
val inputSchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek,
'DepTime, 'CRSDepTime, 'ArrTime, 'CRSArrTime,
'UniqueCarrier, 'FlightNum, 'TailNum,
'ActualElapsedTime, 'CRSElapsedTime, 'AirTime, 'ArrDelay,
'DepDelay, 'Origin, 'Dest, 'Distance,
'TaxiIn, 'TaxiOut, 'Cancelled, 'CancellationCode,
'Diverted, 'CarrierDelay, 'WeatherDelay,
'NASDelay, 'SecurityDelay, 'LateAircraftDelay)
val delaySchmea = List('Year, 'Month, 'DayofMonth, 'DayOfWeek,
'CRSDepTime, 'DepDelay, 'Origin, 'Distance, 'Cancelled)
val featureSchmea = List('Year, 'Month, 'DayofMonth, 'DayOfWeek,
'CRSDepTime, 'DepDelay, 'Distance)
val outputSchema = List('flightDate,'y,'m,'dm,'dw,'crs,'dep,'dist)
val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007",
"09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007",
"01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008",
"09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008")
def gen_features(tuple: (String,String,String,String,String,String,String)) = {
val (year, month, dayOfMonth, dayOfWeek, crsDepTime, depDelay, distance) = tuple
val date = to_date(year.toInt,month.toInt,dayOfMonth.toInt)
val hour = get_hour(crsDepTime)
val holidayDist = days_from_nearest_holiday(year.toInt,month.toInt,dayOfMonth.toInt)
(date,depDelay,month,dayOfMonth,dayOfWeek,hour,distance,holidayDist.toString)
}
def get_hour(depTime: String) = "%04d".format(depTime.toInt).take(2)
def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)
def days_from_nearest_holiday(year:Int, month:Int, day:Int) = {
val sampleDate = new DateTime(year, month, day, 0, 0)
holidays.foldLeft(3000) { (r, c) =>
val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)
val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)
math.min(r, distance)
}
}
}
We execute this Scalding code using the standard "scald.rb" script, generating the feature matrix for both the 2007 dataset and 2008 dataset. Note we use IPython's "%%capture" magic command to capture the output of the Scalding script and print only error messages, if any (stderr)
In [1]:
%%capture capt2007_1
!/home/demo/scalding/scripts/scald.rb --hdfs preprocess1.scala --input "airline/delay/2007.csv" --output "airline/fm/ord_2007_sc_1"
In [6]:
capt2007_1.stderr
Out[6]:
In [3]:
%%capture capt2008_1
!/home/demo/scalding/scripts/scald.rb --hdfs preprocess1.scala --input "airline/delay/2008.csv" --output "airline/fm/ord_2008_sc_1"
In [4]:
capt2008_1.stderr
Out[4]:
Now that we've generated our feature matrix using Scalding and Hadoop, let's turn to using R to build a predictive model for predicting airline delays. First we prepare our trainning set (using the 2007 data) and test set (using 2008 data):
In [11]:
%%R
# Function to compute Precision, Recall and F1-Measure
get_metrics <- function(predicted, actual) {
tp = length(which(predicted == TRUE & actual == TRUE))
tn = length(which(predicted == FALSE & actual == FALSE))
fp = length(which(predicted == TRUE & actual == FALSE))
fn = length(which(predicted == FALSE & actual == TRUE))
precision = tp / (tp+fp)
recall = tp / (tp+fn)
F1 = 2*precision*recall / (precision+recall)
accuracy = (tp+tn) / (tp+tn+fp+fn)
v = c(precision, recall, F1, accuracy)
v
}
# Read input files
process_dataset <- function(filename) {
cols = c('date', 'delay', 'month', 'day', 'dow', 'hour', 'distance', 'days_from_holiday')
data = read_csv_from_hdfs(filename, cols)
data$delay = as.factor(as.numeric(data$delay) >= 15)
data$month = as.factor(data$month)
data$day = as.factor(data$day)
data$dow = as.factor(data$dow)
data$hour = as.numeric(data$hour)
data$distance = as.numeric(data$distance)
data$days_from_holiday = as.numeric(data$days_from_holiday)
data
}
# Prepare training set and test/validation set
data_2007 = process_dataset('/user/demo/airline/fm/ord_2007_sc_1')
data_2008 = process_dataset('/user/demo/airline/fm/ord_2008_sc_1')
fcols = setdiff(names(data_2007), c('date', 'delay'))
train_x = data_2007[,fcols, with=FALSE]
train_y = data_2007$delay
test_x = data_2008[,fcols, with=FALSE]
test_y = data_2008$delay
The preprocess_data function reads the data from HDFS into an R data frame. We use it first to read the feature matrix based on 2007 into the data_2007 R dataframe (used as a training set), and then similarly to build data_2008 as the testing set.
In this cell, we also define a helper function get_metrics that we will use later to measure precision, recall, F1 and accuracy.
Now let's run R's random forest algorithm and evaluate the results:
In [12]:
%%R
rf.model = randomForest(train_x, train_y, ntree=40)
rf.pr <- predict(rf.model, newdata=test_x)
m.rf = get_metrics(as.logical(rf.pr), as.logical(test_y))
print(sprintf("Random Forest: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f", m.rf[1], m.rf[2], m.rf[3], m.rf[4]))
Let's also try R's Gradient Boosted Machines (GBM) modeling. GBM is an ensemble method that like random forest is typically robust to over-fitting.
In [13]:
%%R
gbm.model <- gbm.fit(train_x, as.numeric(train_y)-1, n.trees=500, verbose=F, shrinkage=0.01, distribution="bernoulli",
interaction.depth=3, n.minobsinnode=30)
gbm.pr <- predict(gbm.model, newdata=test_x, n.trees=500, type="response")
m.gbm = get_metrics(gbm.pr >= 0.5, as.logical(test_y))
print(sprintf("Gradient Boosted Machines: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f", m.gbm[1], m.gbm[2], m.gbm[3], m.gbm[4]))
Using Random Foresta and Gradient Boosted Machines we get pretty good results for our predictive model using a simple set of features. Following the same iterative model from the previous parts of this blog post, we now use additional data sources to enrich our core dataset and create new features that would help us achieve better predictive performance.
As we have demonstrated in part 1 and 2 of this blog post, a way to improve accuracy for this model is to layer-in weather data and with it add more informative features to our model. We can get this data from a publicly available dataset here: http://www.ncdc.noaa.gov/cdo-web/datasets//
We now add these additional weather-related features to our model: daily temperatures (min/max), wind speed, snow conditions and precipitation in the flight origin airport (ORD).
So let's re-write our Scalding program to add these new features to our feature matrix:
In [14]:
%%writefile preprocess2.scala
package com.hortonworks.datascience.demo2
import com.twitter.scalding._
import org.joda.time.format._
import org.joda.time.{Days, DateTime}
import com.hortonworks.datascience.demo2.ScaldingFlightDelays._
/**
* pre-process flight and weather data into feature matrix - iteration #2
*/
class ScaldingFlightDelays(args: Args) extends Job(args) {
val delayData = Csv(args("delay"), ",", fields = delayInSchema, skipHeader = true)
.read
.project(delaySchema)
.filter(('Origin,'Cancelled)) { x:(String,String) => x._1 == "ORD" && x._2 == "0"}
.mapTo(filterSchema -> featureSchmea)(gen_features)
val weatherData = Csv(args("weather"),",", fields = weatherInSchema)
.read
.project(weatherSchema)
.filter('Station){x:String => x == "USW00094846"}
.filter('Metric){m:String => m == "TMIN" | m == "TMAX" | m == "PRCP" | m == "SNOW" | m == "AWND"}
.mapTo(weatherSchema -> ('Date,'MM)){tuple:(String,String,String,String) => (tuple._2,tuple._3+":"+tuple._4)}
.groupBy('Date){_.foldLeft('MM -> 'Measures)(Map[String,Double]()){
(m,s:String) => {val kv = s.split(":"); m + (kv(0) -> kv(1).toDouble)}
}
}
delayData.joinWithSmaller(('flightDate,'Date),weatherData)
.project('delay,'m,'dm,'dw,'h,'dist,'holiday,'Measures)
.mapTo(joinSchema -> outputSchema){x:(Double,Double,Double,Double,Double,Double,Double,Map[String,Double]) => {
(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8("TMIN"), x._8("TMAX"), x._8("PRCP"), x._8("SNOW"), x._8("AWND"))
}
}
.write(Csv(args("output"),","))
}
object ScaldingFlightDelays {
val delayInSchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek,
'DepTime, 'CRSDepTime, 'ArrTime, 'CRSArrTime,
'UniqueCarrier, 'FlightNum, 'TailNum,
'ActualElapsedTime, 'CRSElapsedTime, 'AirTime,
'ArrDelay, 'DepDelay, 'Origin, 'Dest,
'Distance, 'TaxiIn, 'TaxiOut,
'Cancelled, 'CancellationCode, 'Diverted,
'CarrierDelay, 'WeatherDelay, 'NASDelay,
'SecurityDelay, 'LateAircraftDelay)
val weatherInSchema = List('Station, 'Date, 'Metric, 'Measure, 'v1, 'v2, 'v3, 'v4)
val delaySchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek,
'CRSDepTime, 'DepDelay, 'Origin, 'Distance, 'Cancelled);
val weatherSchema = List('Station, 'Date, 'Metric, 'Measure)
val filterSchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 'CRSDepTime, 'DepDelay, 'Distance)
val featureSchmea = List('flightDate,'delay,'m,'dm,'dw,'h,'dist,'holiday);
val joinSchema = List('delay,'m,'dm,'dw,'h,'dist,'holiday,'Measures)
val outputSchema = List('delay,'m,'dm,'dw,'h,'dist,'holiday,'tmin,'tmax,'prcp,'snow,'awnd)
val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007",
"09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007",
"01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008",
"09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008")
def gen_features(tuple: (String,String,String,String,String,String,String)) = {
val (year, month,dayOfMonth,dayOfWeek:String, crsDepTime:String,depDelay:String,distance:String) = tuple
val date = to_date(year.toInt,month.toInt,dayOfMonth.toInt)
val hour = get_hour(crsDepTime)
val holidayDist = days_from_nearest_holiday(year.toInt,month.toInt,dayOfMonth.toInt)
(date,depDelay,month,dayOfMonth,dayOfWeek,hour,distance,holidayDist.toString)
}
def get_hour(depTime: String) = "%04d".format(depTime.toInt).take(2)
def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)
def days_from_nearest_holiday(year:Int, month:Int, day:Int) = {
val sampleDate = new DateTime(year, month, day, 0, 0)
holidays.foldLeft(3000) { (r, c) =>
val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)
val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)
math.min(r, distance)
}
}
}
In [9]:
%%capture capt2007_2
!/home/demo/scalding/scripts/scald.rb --hdfs preprocess2.scala --delay "airline/delay/2007.csv" --weather "airline/weather/2007.csv" --output "airline/fm/ord_2007_sc_2"
In [10]:
capt2007_2.stderr
Out[10]:
In [11]:
%%capture capt2008_2
!/home/demo/scalding/scripts/scald.rb --hdfs preprocess2.scala --delay "airline/delay/2008.csv" --weather "airline/weather/2008.csv" --output "airline/fm/ord_2008_sc_2"
In [12]:
capt2008_2.stderr
Out[12]:
Now let's re-build the Random Forest and Gradient Boosted Tree models with the enahanced feature matrices. As before, we first prepare our training set and test set:
In [7]:
%%R
# Read input files
process_dataset <- function(filename) {
cols = c('delay', 'month', 'day', 'dow', 'hour', 'distance', 'days_from_holiday',
'tmin', 'tmax', 'prcp', 'snow', 'awnd')
data = read_csv_from_hdfs(filename, cols)
data$delay = as.factor(as.numeric(data$delay) >= 15)
data$month = as.factor(data$month)
data$day = as.factor(data$day)
data$dow = as.factor(data$dow)
data$hour = as.numeric(data$hour)
data$distance = as.numeric(data$distance)
data$days_from_holiday = as.numeric(data$days_from_holiday)
data$tmin = as.numeric(data$tmin)
data$tmax = as.numeric(data$tmax)
data$prcp = as.numeric(data$prcp)
data$snow = as.numeric(data$snow)
data$awnd = as.numeric(data$awnd)
data
}
# Prepare training set and test/validation set
data_2007 = process_dataset('/user/demo/airline/fm/ord_2007_sc_2')
data_2008 = process_dataset('/user/demo/airline/fm/ord_2008_sc_2')
fcols = setdiff(names(data_2007), c('delay'))
train_x = data_2007[,fcols]
train_y = data_2007$delay
test_x = data_2008[,fcols]
test_y = data_2008$delay
And now to build a Random Forest model:
In [8]:
%%R
rf.model = randomForest(train_x, train_y, ntree=40)
rf.pr <- predict(rf.model, newdata=test_x)
m.rf = get_metrics(as.logical(rf.pr), as.logical(test_y))
print(sprintf("Random Forest: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f",
m.rf[1], m.rf[2], m.rf[3], m.rf[4]))
And the gradient boosted tree model:
In [9]:
%%R
gbm.model <- gbm.fit(train_x, as.numeric(train_y)-1, n.trees=500, verbose=F, shrinkage=0.01, distribution="bernoulli",
interaction.depth=3, n.minobsinnode=30)
gbm.pr <- predict(gbm.model, newdata=test_x, n.trees=500, type="response")
m.gbm = get_metrics(gbm.pr >= 0.5, as.logical(test_y))
print(sprintf("Gradient Boosted Machines: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f",
m.gbm[1], m.gbm[2], m.gbm[3], m.gbm[4]))
In this 3rd part of our blog post we used an IPython notebook to demonstrate how to build a predictive model for airline delays. We have used Scalding on our HDP cluster to perform various types of data pre-processing and feature engineering tasks. We then applied a few machine learning algorithms such as random forest and gradient boosted machines to the resulting datasets and showed how through iterations we add new features resulting in better model performance.