This IPython notebook complements and expounds the implementation details outlined in our series on Data Science and Hadoop part 1 blog, demonstrating how to apply data science techniques with Apache Hadoop.
In this first blog post we will demonstrate a step by step solution to a supervised learning problem, including:
So let's begin.
Every year approximately 20% of airline flights are delayed or cancelled, resulting in significant costs to both travellers and airlines. As our example use-case, we will build a supervised learning model that predicts airline delay from historial flight data and weather information.
Let's begin by exploring the airline delay dataset available here: http://stat-computing.org/dataexpo/2009/the-data.html This dataset includes details about flights in the US from the years 1987-2008. Every row in the dataset includes 29 variables:
Name | Description | |
---|---|---|
1 | Year | 1987-2008 |
2 | Month | 1-12 |
3 | DayofMonth | 1-31 |
4 | DayOfWeek | 1 (Monday) - 7 (Sunday) |
5 | DepTime | actual departure time (local, hhmm) |
6 | CRSDepTime | scheduled departure time (local, hhmm) |
7 | ArrTime | actual arrival time (local, hhmm) |
8 | CRSArrTime | scheduled arrival time (local, hhmm) |
9 | UniqueCarrier | unique carrier code |
10 | FlightNum | flight number |
11 | TailNum | plane tail number |
12 | ActualElapsedTime | in minutes |
13 | CRSElapsedTime | in minutes |
14 | AirTime | in minutes |
15 | ArrDelay | arrival delay, in minutes |
16 | DepDelay | departure delay, in minutes |
17 | Origin | origin |
18 | Dest | destination |
19 | Distance | in miles |
20 | TaxiIn | taxi in time, in minutes |
21 | TaxiOut | taxi out time in minutes |
22 | Cancelled | was the flight cancelled? |
23 | CancellationCode | reason for cancellation (A = carrier, B = weather, C = NAS, D = security) |
24 | Diverted | 1 = yes, 0 = no |
25 | CarrierDelay | in minutes |
26 | WeatherDelay | in minutes |
27 | NASDelay | in minutes |
28 | SecurityDelay | in minutes |
29 | LateAircraftDelay | in minutes |
To simplify, we will build a supervised learning model to predict flight delays for flights leaving O'Hare International airport (ORD), where we "learn" the model using data from 2007, and evaluate its performance using data from 2008.
But first, let's do some exploration of this dataset. Exploration is a common step in building a predictive model -- our goal is to better understand the data we have and get some clues as to which features might be good for the predictive model.
We start by importing some useful python libraries that we will need later like Pandas, Numpy, Scikit-learn and Matplotlib.
In [1]:
# Python library imports: numpy, random, sklearn, pandas, etc
import warnings
warnings.filterwarnings('ignore')
import sys
import random
import numpy as np
from sklearn import linear_model, cross_validation, metrics, svm
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
We now define a utility function to read an HDFS file into a Pandas dataframe using Pydoop. Pydoop is a package that provides a Python API for Hadoop MapReduce and HDFS.
Pydoop's hdfs.open() function reads a single file from HDFS. However many HDFS output files are actually multi-part files, so our read_csv_from_hdfs() function uses hdfs.ls() to grab all the needed file names, and then read each one separately. Finally, it concatenates the resulting Pandas dataframes of each file into a Pandas dataframe.
In [2]:
# function to read HDFS file into dataframe using PyDoop
import pydoop.hdfs as hdfs
def read_csv_from_hdfs(path, cols, col_types=None):
files = hdfs.ls(path);
pieces = []
for f in files:
fhandle = hdfs.open(f)
pieces.append(pd.read_csv(fhandle, names=cols, dtype=col_types))
fhandle.close()
return pd.concat(pieces, ignore_index=True)
Great. Now we got the logistics out of the way, so let's explore this dataset further.
First, let's read the raw data for 2007 from HDFS into a Pandas dataframe. We use our utility function read_csv_from_hdfs() and provide it with column names since this is a raw file, not a HIVE table with meta-data. Let's see how it works:
In [3]:
# read 2007 year file
cols = ['year', 'month', 'day', 'dow', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'Carrier', 'FlightNum',
'TailNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest',
'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay',
'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay'];
flt_2007 = read_csv_from_hdfs('airline/delay/2007.csv', cols)
flt_2007.shape
Out[3]:
We see 7.4M+ flights in 2007 and 29 variables.
Our "target" variable will be DepDelay (scheduled departure delay in minutes). To build a classifier, we further refine our target variable into a binary variable by defining a "delay" as having 15 mins or more of delay, and "non-delay" otherwise. We thus create a new binary variable that we name 'DepDelayed'.
Let's look at some basic statistics, after limiting ourselves to flights originating from ORD:
In [4]:
df = flt_2007[flt_2007['Origin']=='ORD'].dropna(subset=['DepDelay'])
df['DepDelayed'] = df['DepDelay'].apply(lambda x: x>=15)
print "total flights: " + str(df.shape[0])
print "total delays: " + str(df['DepDelayed'].sum())
Let's see how delayed flights are distributed by month:
In [5]:
# Select a Pandas dataframe with flight originating from ORD
# Compute average number of delayed flights per month
grouped = df[['DepDelayed', 'month']].groupby('month').mean()
# plot average delays by month
grouped.plot(kind='bar')
Out[5]:
We see that the average number of delays is highest in December and February, which is what we would expect.
Now let's look at the hour-of-day:
In [6]:
# Compute average number of delayed flights by hour
df['hour'] = df['CRSDepTime'].map(lambda x: int(str(int(x)).zfill(4)[:2]))
grouped = df[['DepDelayed', 'hour']].groupby('hour').mean()
# plot average delays by hour of day
grouped.plot(kind='bar')
Out[6]:
A clear pattern here - flights tend to be delayed later in the day. Perhaps this is because delays tend to pile up as the day progresses and the problem tends to compound later in the day.
Now let's look at delays by carrier:
In [7]:
# Compute average number of delayed flights per carrier
grouped1 = df[['DepDelayed', 'Carrier']].groupby('Carrier').filter(lambda x: len(x)>10)
grouped2 = grouped1.groupby('Carrier').mean()
carrier = grouped2.sort(['DepDelayed'], ascending=False)
# display top 15 destination carriers by delay (from ORD)
carrier[:15].plot(kind='bar')
Out[7]:
As expected, some airlines are better than others.
After exploring the data for a bit, we now move to building the feature matrix for our predictive model.
Let's look at possible predictive variables for our model:
We will also generate another feature: number of days from closest national holiday, with the assumption that holidays tend to be associated with more delays.
We implement this "feature generation" process using PIG and some simply Python user-defined-functions (UDFs). First, let's implement some Python UDFs:
#
# Python UDFs for our PIG script
#
from datetime import date
# get hour-of-day from HHMM field
@outputSchema("value: int")
def get_hour(val):
return int(val.zfill(4)[:2])
# this array defines the dates of holiday in 2007 and 2008
holidays = [
date(2007, 1, 1), date(2007, 1, 15), date(2007, 2, 19), date(2007, 5, 28), date(2007, 6, 7), date(2007, 7, 4), \
date(2007, 9, 3), date(2007, 10, 8), date(2007, 11, 11), date(2007, 11, 22), date(2007, 12, 25), \
date(2008, 1, 1), date(2008, 1, 21), date(2008, 2, 18), date(2008, 5, 22), date(2008, 5, 26), date(2008, 7, 4), \
date(2008, 9, 1), date(2008, 10, 13), date(2008, 11, 11), date(2008, 11, 27), date(2008, 12, 25) \
]
# get number of days from nearest holiday
@outputSchema("days: int")
def days_from_nearest_holiday(year, month, day):
d = date(year, month, day)
x = [(abs(d-h)).days for h in holidays]
return min(x)
Our PIG script is relatively simple:
We can execute this script directly from IPython (the Python UDFs are separately stored in "util.py"):
In [8]:
%%writefile preprocess1.pig
Register 'util.py' USING jython as util;
DEFINE preprocess(year_str, airport_code) returns data
{
-- load airline data from specified year (need to specify fields since it's not in HCat)
airline = load 'airline/delay/$year_str.csv' using PigStorage(',')
as (Year: int, Month: int, DayOfMonth: int, DayOfWeek: int, DepTime: chararray,
CRSDepTime: chararray, ArrTime, CRSArrTime, Carrier: chararray, FlightNum, TailNum, ActualElapsedTime,
CRSElapsedTime, AirTime, ArrDelay, DepDelay: int, Origin: chararray, Dest: chararray, Distance: int,
TaxiIn, TaxiOut, Cancelled: int, CancellationCode, Diverted, CarrierDelay, WeatherDelay,
NASDelay, SecurityDelay, LateAircraftDelay);
-- keep only instances where flight was not cancelled and originate at ORD
airline_flt = filter airline by Cancelled == 0 and Origin == '$airport_code';
-- Keep only fields I need
$data = foreach airline_flt generate DepDelay as delay, Month, DayOfMonth, DayOfWeek,
util.get_hour(CRSDepTime) as hour, Distance, Carrier, Dest,
util.days_from_nearest_holiday(Year, Month, DayOfMonth) as hdays;
};
ORD_2007 = preprocess('2007', 'ORD');
rmf airline/fm/ord_2007_1
store ORD_2007 into 'airline/fm/ord_2007_1' using PigStorage(',');
ORD_2008 = preprocess('2008', 'ORD');
rmf airline/fm/ord_2008_1
store ORD_2008 into 'airline/fm/ord_2008_1' using PigStorage(',');
Let's look at the output as the script continues to process...
In [9]:
%%bash --err pig_out --bg
pig -f preprocess1.pig
In [10]:
while True:
line = pig_out.readline()
if not line:
break
sys.stdout.write("%s" % line)
sys.stdout.flush()
Now that PIG finished processing, we have two new file generated:
(the "1" indicates this is the first iteration; we will work on a second iteration later).
PIG is great for pre-procesing raw data into a feature matrix, but it's not the only choice. We can use other tools such as HIVE, Cascading, Scalding or Spark for this type of pre-processing. We will show how to do the same type of pre-processing using Spark in the second part of this blog post series.
Now we have the files ord_2007_1 and ord_2008_1 under 'airline/fm' folder in HDFS. Let's read those files into Python, and prepare the training and testing (validation) datasets as Pandas DataFrame objects.
Initially, we use only the numerical variables:
In [11]:
# read files
cols = ['delay', 'month', 'day', 'dow', 'hour', 'distance', 'carrier', 'dest', 'days_from_holiday']
col_types = {'delay': int, 'month': int, 'day': int, 'dow': int, 'hour': int, 'distance': int,
'carrier': str, 'dest': str, 'days_from_holiday': int}
data_2007 = read_csv_from_hdfs('airline/fm/ord_2007_1', cols, col_types)
data_2008 = read_csv_from_hdfs('airline/fm/ord_2008_1', cols, col_types)
# Create training set and test set
cols = ['month', 'day', 'dow', 'hour', 'distance', 'days_from_holiday']
train_y = data_2007['delay'] >= 15
train_x = data_2007[cols]
test_y = data_2008['delay'] >= 15
test_x = data_2008[cols]
print train_x.shape
So we have ~359K rows and 6 features in our model.
Now we use Python's excellent Scikit-learn machine learning package to to build two predictive models (Logistic regression and Random Forest) and compare their performance. First we print the confusion matrix, which counts the true positive, true negatives, false positives and false negatives. Then from the confusion matrix, we compute precision, recall, F1 metric and accuracy. Let's start with a logistic regression model and evaluate its performance on the testing dataset.
In [12]:
# Create logistic regression model with L2 regularization
clf_lr = linear_model.LogisticRegression(penalty='l2', class_weight='auto')
clf_lr.fit(train_x, train_y)
# Predict output labels on test set
pr = clf_lr.predict(test_x)
# display evaluation metrics
cm = confusion_matrix(test_y, pr)
print("Confusion matrix")
print(pd.DataFrame(cm))
report_lr = precision_recall_fscore_support(list(test_y), list(pr), average='micro')
print "\nprecision = %0.2f, recall = %0.2f, F1 = %0.2f, accuracy = %0.2f\n" % \
(report_lr[0], report_lr[1], report_lr[2], accuracy_score(list(test_y), list(pr)))
Our logistic regression model got overall accuracy of 60%. Now let's try Random Forest:
In [13]:
# Create Random Forest classifier with 50 trees
clf_rf = RandomForestClassifier(n_estimators=50, n_jobs=-1)
clf_rf.fit(train_x, train_y)
# Evaluate on test set
pr = clf_rf.predict(test_x)
# print results
cm = confusion_matrix(test_y, pr)
print("Confusion matrix")
print(pd.DataFrame(cm))
report_svm = precision_recall_fscore_support(list(test_y), list(pr), average='micro')
print "\nprecision = %0.2f, recall = %0.2f, F1 = %0.2f, accuracy = %0.2f\n" % \
(report_svm[0], report_svm[1], report_svm[2], accuracy_score(list(test_y), list(pr)))
As we can see, Random Forest has overall better accuracy, but lower F1 score. For our problem -- we are trying to predict delays, so the higher level of true positives (197K vs. 143K) is better.
With any supervised learnign algorithm, one typically needs to choose values for the parameters of the model. For example, we chose "L1" regularization for the logistic regression model, and 50 trees for the Random Forest. Such choices are based on some experimentation and hyperparameter tuning (http://en.wikipedia.org/wiki/Hyperparameter_optimization). We are not addressing this topic in this demo, although such choices are important to achieve the overall best model.
It is very common in data science to work iteratively, and improve the model with each iteration. Let's see how this works.
In this iteration, we improve our feature by converting existing variables that are categorical in nature (such as "hour", or "month") as well as categorical variables that are strings (like "carrier" and "dest"), into what is known as "dummy variables". Each "dummy variable" is a binary (0 or 1) that indicates whether a certain category value is "on" or "off.
Fortunately, scikit-learn has the OneHotEncoder functionality to make this easy:
In [14]:
from sklearn.preprocessing import OneHotEncoder
# read files
cols = ['delay', 'month', 'day', 'dow', 'hour', 'distance', 'carrier', 'dest', 'days_from_holiday']
col_types = {'delay': int, 'month': int, 'day': int, 'dow': int, 'hour': int, 'distance': int,
'carrier': str, 'dest': str, 'days_from_holiday': int}
data_2007 = read_csv_from_hdfs('airline/fm/ord_2007_1', cols, col_types)
data_2008 = read_csv_from_hdfs('airline/fm/ord_2008_1', cols, col_types)
# Create training set and test set
train_y = data_2007['delay'] >= 15
categ = [cols.index(x) for x in 'hour', 'month', 'day', 'dow', 'carrier', 'dest']
enc = OneHotEncoder(categorical_features = categ)
df = data_2007.drop('delay', axis=1)
df['carrier'] = pd.factorize(df['carrier'])[0]
df['dest'] = pd.factorize(df['dest'])[0]
train_x = enc.fit_transform(df)
test_y = data_2008['delay'] >= 15
df = data_2008.drop('delay', axis=1)
df['carrier'] = pd.factorize(df['carrier'])[0]
df['dest'] = pd.factorize(df['dest'])[0]
test_x = enc.transform(df)
print train_x.shape
So we can see the first 5 lines of the feature matrix. Overall, we have ~359K rows and 409 features in our model. Let's re-run the Random Forest model and see if this improved our model:
In [15]:
# Create Random Forest classifier with 50 trees
clf_rf = RandomForestClassifier(n_estimators=50, n_jobs=-1)
clf_rf.fit(train_x.toarray(), train_y)
# Evaluate on test set
pr = clf_rf.predict(test_x.toarray())
# print results
cm = confusion_matrix(test_y, pr)
print("Confusion matrix")
print(pd.DataFrame(cm))
report_svm = precision_recall_fscore_support(list(test_y), list(pr), average='micro')
print "\nprecision = %0.2f, recall = %0.2f, F1 = %0.2f, accuracy = %0.2f\n" % \
(report_svm[0], report_svm[1], report_svm[2], accuracy_score(list(test_y), list(pr)))
This clearly helped -- accuracy is higher at ~70%, and true positive are also better at 216K (vs 197K previously).
Another common path to improve accuracy is by bringing in new types of data - enriching our dataset - and generating more features. Our idea is to layer-in weather data. We can get this data from a publicly available dataset here: http://www.ncdc.noaa.gov/cdo-web/datasets/
We will look at daily temperatures (min/max), wind speed, snow conditions and precipitation in the flight origin airport (ORD). Clearly, weather conditions in the destination airport also affect delays, but for simplicity of this demo we just include weather at the origin (ORD).
First, let's re-write our PIG script to add these new features to our feature matrix:
In [16]:
%%writefile preprocess2.pig
register 'util.py' USING jython as util;
-- Helper macro to load data and join into a feature vector per instance
DEFINE preprocess(year_str, airport_code) returns data
{
-- load airline data from specified year (need to specify fields since it's not in HCat)
airline = load 'airline/delay/$year_str.csv' using PigStorage(',')
as (Year: int, Month: int, DayOfMonth: int, DayOfWeek: int, DepTime: chararray, CRSDepTime:chararray,
ArrTime, CRSArrTime, Carrier: chararray, FlightNum, TailNum, ActualElapsedTime, CRSElapsedTime, AirTime,
ArrDelay, DepDelay: int, Origin: chararray, Dest: chararray, Distance: int, TaxiIn, TaxiOut,
Cancelled: int, CancellationCode, Diverted, CarrierDelay, WeatherDelay, NASDelay,
SecurityDelay, LateAircraftDelay);
-- keep only instances where flight was not cancelled and originate at ORD
airline_flt = filter airline by Cancelled == 0 and Origin == '$airport_code';
-- Keep only fields I need
airline2 = foreach airline_flt generate Year as year, Month as month, DayOfMonth as day, DayOfWeek as dow,
Carrier as carrier, Origin as origin, Dest as dest, Distance as distance,
CRSDepTime as time, DepDelay as delay, util.to_date(Year, Month, DayOfMonth) as date;
-- load weather data
weather = load 'airline/weather/$year_str.csv' using PigStorage(',')
as (station: chararray, date: chararray, metric, value, t1, t2, t3, time);
-- keep only TMIN and TMAX weather observations from ORD
weather_tmin = filter weather by station == 'USW00094846' and metric == 'TMIN';
weather_tmax = filter weather by station == 'USW00094846' and metric == 'TMAX';
weather_prcp = filter weather by station == 'USW00094846' and metric == 'PRCP';
weather_snow = filter weather by station == 'USW00094846' and metric == 'SNOW';
weather_awnd = filter weather by station == 'USW00094846' and metric == 'AWND';
joined = join airline2 by date, weather_tmin by date, weather_tmax by date, weather_prcp by date,
weather_snow by date, weather_awnd by date;
$data = foreach joined generate delay, month, day, dow, util.get_hour(airline2::time) as tod, distance, carrier, dest,
util.days_from_nearest_holiday(year, month, day) as hdays,
weather_tmin::value as temp_min, weather_tmax::value as temp_max,
weather_prcp::value as prcp, weather_snow::value as snow, weather_awnd::value as wind;
};
ORD_2007 = preprocess('2007', 'ORD');
rmf airline/fm/ord_2007_2;
store ORD_2007 into 'airline/fm/ord_2007_2' using PigStorage(',');
ORD_2008 = preprocess('2008', 'ORD');
rmf airline/fm/ord_2008_2;
store ORD_2008 into 'airline/fm/ord_2008_2' using PigStorage(',');
In [17]:
%%bash --bg --err pig_out2
pig -f preprocess2.pig
In [18]:
while True:
line = pig_out2.readline()
if not line:
break
sys.stdout.write("%s" % line)
sys.stdout.flush()
We now read this data in, convert temparatures to Fahrenheit (note original temp is in Celcius*10), and prepare the training and testing datasets for modeling.
In [19]:
from sklearn.preprocessing import OneHotEncoder
# Convert Celsius to Fahrenheit
def fahrenheit(x): return(x*1.8 + 32.0)
# read files
cols = ['delay', 'month', 'day', 'dow', 'hour', 'distance', 'carrier', 'dest', 'days_from_holiday',
'origin_tmin', 'origin_tmax', 'origin_prcp', 'origin_snow', 'origin_wind']
col_types = {'delay': int, 'month': int, 'day': int, 'dow': int, 'hour': int, 'distance': int,
'carrier': str, 'dest': str, 'days_from_holiday': int,
'origin_tmin': float, 'origin_tmax': float, 'origin_prcp': float, 'origin_snow': float, 'origin_wind': float}
data_2007 = read_csv_from_hdfs('airline/fm/ord_2007_2', cols, col_types)
data_2008 = read_csv_from_hdfs('airline/fm/ord_2008_2', cols, col_types)
data_2007['origin_tmin'] = data_2007['origin_tmin'].apply(lambda x: fahrenheit(x/10.0))
data_2007['origin_tmax'] = data_2007['origin_tmax'].apply(lambda x: fahrenheit(x/10.0))
data_2008['origin_tmin'] = data_2008['origin_tmin'].apply(lambda x: fahrenheit(x/10.0))
data_2008['origin_tmax'] = data_2008['origin_tmax'].apply(lambda x: fahrenheit(x/10.0))
# Create training set and test set
train_y = data_2007['delay'] >= 15
categ = [cols.index(x) for x in 'hour', 'month', 'day', 'dow', 'carrier', 'dest']
enc = OneHotEncoder(categorical_features = categ)
df = data_2007.drop('delay', axis=1)
df['carrier'] = pd.factorize(df['carrier'])[0]
df['dest'] = pd.factorize(df['dest'])[0]
train_x = enc.fit_transform(df)
test_y = data_2008['delay'] >= 15
df = data_2008.drop('delay', axis=1)
df['carrier'] = pd.factorize(df['carrier'])[0]
df['dest'] = pd.factorize(df['dest'])[0]
test_x = enc.transform(df)
print train_x.shape
Good. So now that we have the training and test (validation) set ready, let's try Random Forest with the new features:
In [20]:
# Create Random Forest classifier with 100 trees
clf_rf = RandomForestClassifier(n_estimators=100, n_jobs=-1)
clf_rf.fit(train_x.toarray(), train_y)
# Evaluate on test set
pr = clf_rf.predict(test_x.toarray())
# print results
cm = confusion_matrix(test_y, pr)
print("Confusion matrix")
print(pd.DataFrame(cm))
report_rf = precision_recall_fscore_support(list(test_y), list(pr), average='micro')
print "precision = %0.2f, recall = %0.2f, F1 = %0.2f, accuracy = %0.2f\n" % \
(report_rf[0], report_rf[1], report_rf[2], accuracy_score(list(test_y), list(pr)))
with the new weather features, accuracy went up again from 0.70 to 0.74.
Clearly with more iterations, we are likely going to improve accuracy even further. For example, we can add weather information at the Origin, or explore the number of seats on the plan as a predictive feature (we can get that from the tail number), and so on.
In this blog post we have demonstrated how to build a predictive model with Hadoop and Python. We have used Hadoop to perform various types of data pre-processing and feature engineering tasks. We then applied Scikit-learn machine learning algorithm on the resulting datasets and have shown how via iterations we continuously add new and improved features resulting in better model performance.
In the next part of this multi-part blog post we will show how to perform the same learning task with Spark and ML-Lib.