With the rapid adoption of Apache Hadoop in the enterprise, machine learning is becoming a key technology used by enterprises to extract tangible business value from their massive data assets. This derivation of business value is possible because Apache Hadoop YARN as the architectural center of Modern Data Architecture (MDA) allows purpose-built data engines such as Apache Tez and Apache Spark to process and iterate over multiple datasets for data science techniques within the same cluster.
It is a common misconception that the way we apply predictive learning algorithms like Linear Regression, Random Forest or Neural Networks to large datasets requires a dramatic change in approach, in tooling, or dedicated, siloed clusters. In fact, the big change is in what is known as “feature engineering” – the process by which very big raw data is transformed into a “feature matrix”. Enabled by Hadoop with YARN as an ideal platform, this transformation of large raw datasets (terabytes or petabytes) into a feature matrix is now scalable and not limited by RAM or compute power of a single node.
Since the output of the feature engineering step (the "feature matrix") tends to be relatively small in size (typically in the 2-20GB range), a common choice is to run the learning algorithm on a single machine (often with multiple cores and high amount of RAM), allowing us to utilize a plethora of existing robust tools and algorithms from R packages, Python's Scikit-learn, SAS or others.
In this interactive IPython notebook we will demonstrate, via an example, a step by step solution to a supervised learning problem using HIVE (with R UDFs) for pre-processing and R for modeling.
R is a language and environment for statistical computing and graphics. It is a GNU project which was developed at Bell Laboratories by John Chambers and his colleagues. R is an open source project with more than 6000 packages available covering various topics in data science including classification, regression, clustering, anomaly detection, market basket analysis, text processing and many others. R is an extremely powerful and mature environment for statistical analysis and data science.
Apache HIVE is the de-facto SQL-on-Hadoop tool, originally developed at Facebook and now a fast, mature and effective tool for data analytics. HIVE provides a rich set of built-in funtionality, and is extensible via user-defined functions (e.g. in Java or Python) enabling custom business logic.
Every year approximately 20% of airline flights are delayed or cancelled, resulting in significant costs to both travellers and airlines. As our example use-case, we will build a supervised learning model that predicts airline delay from historial flight data and weather information.
Let's begin by exploring the airline delay dataset available here: http://stat-computing.org/dataexpo/2009/the-data.html This dataset includes details about flights in the US from the years 1987-2008. Every row in the dataset includes 29 variables:
Name | Description | |
---|---|---|
1 | Year | 1987-2008 |
2 | Month | 1-12 |
3 | DayofMonth | 1-31 |
4 | DayOfWeek | 1 (Monday) - 7 (Sunday) |
5 | DepTime | actual departure time (local, hhmm) |
6 | CRSDepTime | scheduled departure time (local, hhmm) |
7 | ArrTime | actual arrival time (local, hhmm) |
8 | CRSArrTime | scheduled arrival time (local, hhmm) |
9 | UniqueCarrier | unique carrier code |
10 | FlightNum | flight number |
11 | TailNum | plane tail number |
12 | ActualElapsedTime | in minutes |
13 | CRSElapsedTime | in minutes |
14 | AirTime | in minutes |
15 | ArrDelay | arrival delay, in minutes |
16 | DepDelay | departure delay, in minutes |
17 | Origin | origin - IATA airport code |
18 | Dest | destination - IATA airport code |
19 | Distance | in miles |
20 | TaxiIn | taxi in time, in minutes |
21 | TaxiOut | taxi out time in minutes |
22 | Cancelled | was the flight cancelled? |
23 | CancellationCode | reason for cancellation (A = carrier, B = weather, C = NAS, D = security) |
24 | Diverted | 1 = yes, 0 = no |
25 | CarrierDelay | in minutes |
26 | WeatherDelay | in minutes |
27 | NASDelay | in minutes |
28 | SecurityDelay | in minutes |
29 | LateAircraftDelay | in minutes |
Before we start modeling, let's do some exploration of this dataset.
R is a fantastic environment for data exploration and is often used for this purpose. With a ton of statistical and data manipulation functionality being part of core R, as well as powerful graphics packages such as ggplot2, performing data exploration in R is easy and fun.
Then we load RJDBC and setup the connection to HIVE. We also define the exec.query() function to remove the table name from column names in the result set, for convenience:
In [1]:
require(rJava)
require(RJDBC)
drv <- JDBC("org.apache.hive.jdbc.HiveDriver",
classPath = c("/usr/hdp/current/hive-client/lib/hive-jdbc.jar",
"/usr/hdp/current/hadoop-client/hadoop-common.jar"))
conn <- dbConnect(drv, "jdbc:hive2://ml-master.cloud.hortonworks.com:10000/demo", "jupyter", "hadoop")
exec.query <- function(query) {
df = dbGetQuery(conn, query)
names(df) = sapply(names(df), function(x) {
x = unlist(strsplit(x, split='.', fixed=T))
if (length(x)>1) x[2] else x[1]
})
df
}
Let's take a quick look at the data:
In [2]:
df_samp = exec.query("SELECT * FROM flights WHERE year=2007 LIMIT 5")
print(df_samp)
Our "target" variable will be depdelay (departure delay in minutes). To build a classifier, we define a derived target variable by defining a "delay" as having 15 mins or more of delay, and "non-delay" otherwise. We thus replace depdelay with a new binary variable of the same name.
Let's explore some basic statostocs about the dataset: number of flights and delays (per our new definition), after limiting ourselves to flights originating from ORD (Chicago O'Hare airport):
In [3]:
num_rows = exec.query("SELECT COUNT(*) FROM flights WHERE year=2007")
print(paste0("total flights in 2007: ", as.numeric(num_rows[1])))
df1 = exec.query("SELECT * FROM flights WHERE year=2007 and origin='ORD' and depdelay is not NULL")
df1$depdelay = sapply(df1$depdelay, function(x) (if (as.numeric(x)>=15) 1 else 0))
print(paste0("total flights originating from ORD: ", as.character(dim(df1)[1])))
print(paste0("total delayed flights originating from ORD: ", as.character(sum(df1$depdelay))))
The "month" feature is likely a good feature for modeling -- let's look at the distribution of delays (as percentage of total flights) by month:
In [4]:
require(plyr)
df2 = df1[, c('depdelay', 'month')]
df2$month = as.numeric(df2$month)
df2 <- ddply(df2, .(month), summarise, mean_delay=mean(depdelay))
barplot(df2$mean_delay, names.arg=df2$month, xlab="month", ylab="% of delays", col="blue")
Similarly, one would expect that "hour of day" would be a good feature to predict flight delays, as later flights in the day may present more delays due to density effects. Let's look at that:
In [5]:
# Extract hour of day from 3 or 4 digit time-of-day string
get_hour <- function(x) {
s = sprintf("%04d", as.numeric(x))
return(substr(s, 1, 2))
}
df2 = df1[, c('depdelay', 'crsdeptime')]
df2$hour = as.numeric(sapply(df2$crsdeptime, get_hour))
df2$crsdeptime <- NULL
df2 <- ddply(df2, .(hour), summarise, mean_delay=mean(depdelay))
barplot(df2$mean_delay, names.arg=df2$hour, xlab="hour of day", ylab="% of delays", col="green", cex.names=.7)
In this demo we have not explored all the variables of course, just a couple to demonstrate R's capabilities in this area. R has a wide variety of graphing tools suchas base, lattice and ggplot2.
After playing around with the data, and exploring some potential features -- we will now demonstrate how to use HIVE with a UDF in R to preform some simple pre-processing on Hadoop, and create a feature matrix from the raw dataset.
To simplify, we will build a supervised learning model to predict flight delays for flights leaving O'Hare International airport (ORD), where we "train" the model using data from 2007, and evaluate its performance using data from 2008.
In our first iteration, we create the following features:
We use HIVE with a UDF written in R, and create two "feature matrices", one for 2007 and another for 2008. Let's look at the code. First we create our Hive UDF in R, and store it in the local folder:
In [6]:
hiveUdfStr = '
get_hour <- function(val) { return(substr(sprintf("%04d", val),1,2)) }
# get number of days from nearest holiday
holidays = sapply(c("2007-01-01", "2007-01-15", "2007-02-19", "2007-05-28", "2007-06-07", "2007-07-04",
"2007-09-03", "2007-10-08", "2007-11-11", "2007-11-22", "2007-12-25",
"2008-01-01", "2008-01-21", "2008-02-18", "2008-05-22", "2008-05-26", "2008-07-04",
"2008-09-01", "2008-10-13", "2008-11-11", "2008-11-27", "2008-12-25"),
function(x) { as.Date(x) })
days_from_nearest_holiday <- function(year, month, day) {
d = as.Date(sprintf("%04d-%02d-%02d", year, month, day))
v = sapply(holidays, function(h) { abs(as.integer(d-h)) })
min(v)
}
input <- file("stdin", "r")
while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
if(nchar(line) == 0) break
fields <- unlist(strsplit(line, "\t"))
year = as.integer(fields[1]); month = as.integer(fields[2]); day = as.integer(fields[3]);
dayofweek = as.integer(fields[4]); depdelay = as.integer(fields[5]);
distance = as.integer(fields[6]); crsdeptime = as.integer(fields[7])
delayed = if (!is.na(depdelay) && depdelay>15) 1 else 0
out = paste(delayed, year, month, day, dayofweek, distance,
get_hour(crsdeptime), days_from_nearest_holiday(year, month, day), sep="\t")
write(out, stdout())
}
'
fileConn<-file("/home/jupyter/notebooks/airline/hiveudf.R")
writeLines(hiveUdfStr, fileConn)
close(fileConn)
Next we crete a HIVE query, using this UDF, and execute it. This query extracts the basic features we are interested, such as month, day and distance, but also generates custom features such as hour, and day from nearest holiday via the HIVE UDF:
In [7]:
hiveScriptStr = "
USE demo;
ADD FILE /home/jupyter/notebooks/airline/hiveudf.R;
DROP TABLE fmat_2007;
CREATE TABLE fmat_2007 (delayed int, year int, month int, day int, dayofweek int, distance int, hour int, days_from_holiday int)
STORED AS ORC;
FROM (
SELECT year, month, day, dayofweek, depdelay, distance, crsdeptime
FROM flights
WHERE flights.year=2007 and flights.origin='ORD' and flights.cancelled=0 and flights.depdelay is not NULL
DISTRIBUTE BY month
) t1
INSERT OVERWRITE TABLE fmat_2007
REDUCE year, month, day, dayofweek, depdelay, distance, crsdeptime
USING 'Rscript hiveudf.R' AS (delayed, year, month, day, dayofweek, distance, hour, days_from_holiday);
DROP TABLE fmat_2008;
CREATE TABLE fmat_2008 (delayed int, year int, month int, day int, dayofweek int, distance int, hour int, days_from_holiday int)
STORED AS ORC;
FROM (
SELECT year, month, day, dayofweek, depdelay, distance, crsdeptime
FROM flights
WHERE flights.year=2008 and flights.origin='ORD' and flights.cancelled=0 and flights.depdelay is not NULL
DISTRIBUTE BY month
) t1
INSERT OVERWRITE TABLE fmat_2008
REDUCE year, month, day, dayofweek, depdelay, distance, crsdeptime
USING 'Rscript hiveudf.R' AS (delayed, year, month, day, dayofweek, distance, hour, days_from_holiday);
"
scriptFile = "/home/jupyter/notebooks/airline/iter1.hql"
fileConn<-file(scriptFile)
writeLines(hiveScriptStr, fileConn)
close(fileConn)
system2("hive", c("-f", scriptFile))
Now that we've generated our feature matrix using HIVE, let's turn back to using R to build a model for predicting airline delays. First we read the feature matrices, and define a helper function (get_metrics) in R to compute the classification metrics:
In [8]:
# Function to compute Precision, Recall and F1-Measure
get_metrics <- function(predicted, actual) {
tp = length(which(predicted == TRUE & actual == TRUE))
tn = length(which(predicted == FALSE & actual == FALSE))
fp = length(which(predicted == TRUE & actual == FALSE))
fn = length(which(predicted == FALSE & actual == TRUE))
precision = tp / (tp+fp)
recall = tp / (tp+fn)
F1 = 2*precision*recall / (precision+recall)
accuracy = (tp+tn) / (tp+tn+fp+fn)
v = c(precision, recall, F1, accuracy)
v
}
# Load 2007 feature matrix and construct training set
train_data = exec.query("SELECT delayed, month, day, dayofweek, distance, hour, days_from_holiday FROM fmat_2007")
train_data$month = as.factor(train_data$month)
train_data$day = as.factor(train_data$day)
train_data$dayofweek = as.factor(train_data$dayofweek)
train_data$delayed = as.integer(train_data$delayed)
# Load 2008 feature matrix and construct test set
test_data = exec.query("SELECT delayed, month, day, dayofweek, distance, hour, days_from_holiday FROM fmat_2008")
test_data$month = as.factor(test_data$month)
test_data$day = as.factor(test_data$day)
test_data$dayofweek = as.factor(test_data$dayofweek)
test_data$delayed = as.integer(test_data$delayed)
head(train_data, 5)
Out[8]:
Now we train our model using R's Gradient Boosted Machines (GBM) library. GBM is an ensemble method that is typically robust to over-fitting, and has pretty good performance.
In [9]:
nTrees = 5000
require(gbm)
gbm.model <- gbm(delayed ~ ., data=train_data, n.trees=nTrees, verbose=F, shrinkage=0.5,
distribution="bernoulli", interaction.depth=3, n.minobsinnode=30, cv.folds=3)
gbm.pr <- predict(gbm.model, newdata=test_data[,-1], n.trees=nTrees, type="response")
m.gbm = get_metrics(gbm.pr >= 0.5, as.logical(test_data$delayed))
print(sprintf("Gradient Boosted Machines: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f",
m.gbm[1], m.gbm[2], m.gbm[3], m.gbm[4]))
GBM has a capability to present performance of the model per number of trees, as well as feature importance, let's take a look:
In [10]:
gbm.perf(gbm.model)
summary(gbm.model)
Out[10]:
Out[10]:
Using Gradient Boosted Machines we get reasonable results for our predictive model using a simple set of features. Can we do better? It's often the case that adding new features from additional data sources may improve the predictive performance of the model. We tackle this next.
A reasonable way to improve accuracy for this model is to layer-in weather data and with it add more informative features to our model. We can get this data from a publicly available dataset here: http://www.ncdc.noaa.gov/cdo-web/datasets/
We now add these additional weather-related features to our model: daily temperatures (min/max), wind speed, snow conditions and precipitation in the flight origin airport (ORD).
So let's re-write our HIVE script to add these new features to our feature matrix.
We make use of a HIVE UDF that resides in the tomap.jar, which allows us to implement functionality similar to PIVOT. We group by <station,year,month,day> and collect for each such group the various metrics in a map structure; then we translate this into individual columns for the metrics we want: TMIN, TMAX, SNOW, PRCP and AWND. tomap.jar is from code available here: https://github.com/wdavidw/hive-udf
In [11]:
hiveScriptStr = "
USE demo;
ADD FILE /home/jupyter/notebooks/airline/hiveudf.R;
ADD JAR /home/jupyter/notebooks/airline/tomap.jar;
CREATE TEMPORARY FUNCTION to_map as 'com.adaltas.UDAFToMap';
DROP TABLE fmat_2007_2;
CREATE TABLE fmat_2007_2 (delayed int, year int, month int, day int, dayofweek int,
distance int, hour int, days_from_holiday int,
tmin float, tmax float, prcp float, awnd float, snow float) STORED AS ORC;
INSERT OVERWRITE TABLE fmat_2007_2
SELECT * FROM
(SELECT delayed, f.year, f.month, f.day, dayofweek, distance, hour, days_from_holiday, tmin, tmax, prcp, awnd, snow
FROM (
FROM (
SELECT year, month, day, dayofweek, depdelay, distance, crsdeptime
FROM flights
WHERE flights.year=2007 and flights.origin='ORD' and flights.cancelled=0 and flights.depdelay is not NULL
DISTRIBUTE BY month) t1
REDUCE year, month, day, dayofweek, depdelay, distance, crsdeptime
USING 'Rscript hiveudf.R' AS (delayed, year, month, day, dayofweek, distance, hour, days_from_holiday)
) f
JOIN
(
SELECT station, year, month, day,
mapvars['TMIN'] as tmin, mapvars['TMAX'] as tmax,
mapvars['SNOW'] as snow, mapvars['PRCP'] as prcp, mapvars['AWND'] as awnd
FROM (SELECT station, year, month, day, to_map(metric,value) as mapvars
FROM weather WHERE station = 'USW00094846'
GROUP BY station, year, month, day) subq
) w
ON (w.year = f.year and w.month = f.month and w.day = f.day)
) subq;
DROP TABLE fmat_2008_2;
CREATE TABLE fmat_2008_2 (delayed int, year int, month int, day int, dayofweek int,
distance int, hour int, days_from_holiday int,
tmin float, tmax float, prcp float, awnd float, snow float) STORED AS ORC;
INSERT OVERWRITE TABLE fmat_2008_2
SELECT * FROM
(SELECT delayed, f.year, f.month, f.day, dayofweek, distance, hour, days_from_holiday, tmin, tmax, prcp, awnd, snow
FROM (
FROM (
SELECT year, month, day, dayofweek, depdelay, distance, crsdeptime
FROM flights
WHERE flights.year=2008 and flights.origin='ORD' and flights.cancelled=0 and flights.depdelay is not NULL
DISTRIBUTE BY month) t1
REDUCE year, month, day, dayofweek, depdelay, distance, crsdeptime
USING 'Rscript hiveudf.R' AS (delayed, year, month, day, dayofweek, distance, hour, days_from_holiday)
) f
JOIN
(
SELECT station, year, month, day,
mapvars['TMIN'] as tmin, mapvars['TMAX'] as tmax,
mapvars['SNOW'] as snow, mapvars['PRCP'] as prcp, mapvars['AWND'] as awnd
FROM (SELECT station, year, month, day, to_map(metric,value) as mapvars
FROM weather WHERE station = 'USW00094846'
GROUP BY station, year, month, day) subq
) w
ON (w.year = f.year and w.month = f.month and w.day = f.day)
) subq;
"
scriptFile = "/home/jupyter/notebooks/airline/iter2.hql"
fileConn<-file(scriptFile)
writeLines(hiveScriptStr, fileConn)
close(fileConn)
system2("hive", c("-f", scriptFile))
Now let's re-build the Gradient Boosted Tree models with the enahanced feature matrices. As before, we first prepare our training set and test set:
In [12]:
# Load 2007 feature matrix and construct training set
train_data = exec.query("SELECT delayed, month, day, dayofweek, distance, hour, days_from_holiday,
tmin, tmax, prcp, awnd, snow
FROM fmat_2007_2")
train_data$month = as.factor(train_data$month)
train_data$day = as.factor(train_data$day)
train_data$dayofweek = as.factor(train_data$dayofweek)
train_data$delayed = as.integer(train_data$delayed)
# Load 2008 feature matrix and construct test set
test_data = exec.query("SELECT delayed, month, day, dayofweek, distance, hour, days_from_holiday,
tmin, tmax, prcp, awnd, snow
FROM fmat_2008_2")
test_data$month = as.factor(test_data$month)
test_data$day = as.factor(test_data$day)
test_data$dayofweek = as.factor(test_data$dayofweek)
test_data$delayed = as.integer(test_data$delayed)
head(train_data, 5)
Out[12]:
Now let's try a gradient boosted tree model:
In [13]:
nTrees = 5000
require(gbm)
gbm.model <- gbm(delayed ~ ., data=train_data, n.trees=nTrees, verbose=F, shrinkage=0.5,
distribution="bernoulli", interaction.depth=3, n.minobsinnode=30, cv.folds=3)
gbm.pr <- predict(gbm.model, newdata=test_data[,-1], n.trees=nTrees, type="response")
m.gbm = get_metrics(gbm.pr >= 0.5, as.logical(test_data$delayed))
print(sprintf("Gradient Boosted Machines: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f",
m.gbm[1], m.gbm[2], m.gbm[3], m.gbm[4]))
In [14]:
gbm.perf(gbm.model)
summary(gbm.model)
Out[14]:
Out[14]:
In this demo we used an IPython notebook to demonstrate how to build a predictive model for airline delays. We have used HIVE and R on our HDP cluster to pre-process the data, build a feature matrix, and apply machine learning algorithms to the resulting datasets. We had shown how through iterations we add new features resulting in better model performance.