Predicting airline delays with Spark and ML-Lib using pySpark

Pre-processing with PySpark

Read in the data

import os.path


base_dir = os.path.join('data')
input_path_2007 = os.path.join('flights', '2007.csv')
input_path_2008 = os.path.join('flights', '2008.csv')
input_path_weather = os.path.join('flights', AIRPORT + '.csv')

file_name_2007 = os.path.join(base_dir, input_path_2007)
file_name_2008 = os.path.join(base_dir, input_path_2008)
file_name_weather = os.path.join(base_dir, input_path_weather)

raw_data_2007 = sc.textFile(file_name_2007)
raw_data_2008 = sc.textFile(file_name_2008)
weather_data = sc.textFile(file_name_weather)

header1 = raw_data_2007.take(1) 

filtered_data_2007 = (raw_data_2007
                        # filter on Airport
                        .filter(lambda l: ',' + AIRPORT + ',' in l)
                        # filter out cancelled flights
                        .filter(lambda l: ',,' in l)
                        # filter out header
                        .filter(lambda line: 'Year' not in line))
filtered_data_2008 = (raw_data_2008
                        # filter on Airport
                        .filter(lambda l: ',' + AIRPORT + ',' in l)
                        # filter out cancelled flights
                        .filter(lambda l: ',,' in l)
                        # filter out header
                        .filter(lambda l: 'Year' not in l))

# CRS = Computer Reservation System
# scheduled time as opposed to the actual time
print header1
print filtered_data_2007.take(1)
print filtered_data_2008.take(1)


Public holidays

holidays = ['01/01/2007', '01/15/2007', '02/19/2007', '05/28/2007', '06/07/2007', '07/04/2007',
      '09/03/2007', '10/08/2007' ,'11/11/2007', '11/22/2007', '12/25/2007',
      '01/01/2008', '01/21/2008', '02/18/2008', '05/22/2008', '05/26/2008', '07/04/2008',
      '09/01/2008', '10/13/2008' ,'11/11/2008', '11/27/2008', '12/25/2008']

Add weather and plane data and extract features

import datetime
import numpy as np
from pyspark.mllib.regression import LabeledPoint

def make_weather_dict():
    weather_dict = {}
    weather_lines = weather_data.collect()
    for line in weather_lines:
        all_vals = line.split(',')
        # key: date / values: precipitation, snow, max temp, min temp, average wind
        weather_dict[all_vals[2]] = [all_vals[3], all_vals[4], all_vals[5], all_vals[6], all_vals[7]]
    return weather_dict

def days_from_nearest_holiday(year, month, day):
    diffs = []
    sample_date =, month, day)
    for holiday in holidays:
        dt = datetime.datetime.strptime(holiday, '%m/%d/%Y').date()
        td = dt - sample_date

    return min(diffs) * 1.0
def make_labeled_point(flight):
        wd = weather_dict.value
        flight_data = flight.split(',')
        date_string = str(flight_data[0]) + str(flight_data[1]).zfill(2) + str(flight_data[2]).zfill(2)
        weather_data = wd[date_string]
        features = []
        lbl = 0.0
        if float(flight_data[15]) >= 15.0:
            lbl = 1.0
        features.append(float(flight_data[1]))         # Month
        features.append(float(flight_data[2]))         # DayofMonth
        features.append(float(flight_data[3]))         # DayOfWeek
        features.append(float(flight_data[5]) / 100)   # CRSDepTime
        features.append(days_from_nearest_holiday(int(flight_data[0]), int(flight_data[1]), int(flight_data[2]))) 
        features.append(float(weather_data[0]))        # precipitation
        features.append(float(weather_data[1]) / 10.0) # snow (tenths of mm)
        features.append(float(weather_data[2]) / 10.0) # max temp (tenths of degrees C)
        features.append(float(weather_data[3]) / 10.0) # min temp (tenths of degrees C)
        features.append(float(weather_data[4]))        # average wind

        return LabeledPoint(lbl, features)
        return LabeledPoint(0.0, [0.0 for i in range(10)])

weather_dict = sc.broadcast(make_weather_dict())

train_data = line: make_labeled_point(line))
test_data = line: make_labeled_point(line))

print train_data.take(1)
print test_data.take(1)

[LabeledPoint(0.0, [1.0,25.0,4.0,11.0,10.0,0.0,0.0,-4.4,-9.4,42.0])]
[LabeledPoint(1.0, [1.0,8.0,2.0,16.0,7.0,168.0,0.0,13.3,2.8,61.0])]

Delay distribution in percent

import matplotlib.pyplot as plt
%matplotlib inline

total = test_data.count()
late = test_data.filter(lambda lp: lp.label == 1.0).count()
labels = ['On Time', 'Delayed']
fracs = [total - late, late]
explode = (0, 0.05)

fig = plt.figure(figsize=(7, 7))
fig.suptitle('2008 - Delays in percent', fontsize=14, fontweight='bold')
ax = fig.add_subplot(111)
ax.pie(fracs, explode=explode, labels=labels, autopct='%1.1f%%', shadow=True, startangle=90)

Delay distribution by month

x_axis = [i + 1 for i in range(12)]
y_axis = (test_data.filter(lambda lp: lp.label == 1.0)
                   .map(lambda lp: (lp.features[0], 1))
                   .reduceByKey(lambda x, y: x + y)
                   .map(lambda tup: tup[1])

fig = plt.figure(figsize=(7, 7))
fig.suptitle('2008 - Delays per month', fontsize=14, fontweight='bold')
ax = fig.add_subplot(111)
ax.set_ylabel('Delayed'), y_axis, color='g', align='center')

Modeling with Spark and ML-Lib

from pyspark.mllib.stat import Statistics
import math

# rescale the features by centering
# and dividing by the variance
def rescale(features):
    r = []
    for i, f in enumerate(features):
        c = f - mean.value[i]
        s = c / math.sqrt(variance.value[i]) if c != 0.0 else 0.0
    return r

summary = Statistics.colStats( lp: lp.features))
# broadcast as list to avoid
# the 'sc broadcast' error
mean = sc.broadcast(summary.mean())
variance = sc.broadcast(summary.variance())
scaled_train_data = lp: LabeledPoint(lp.label, rescale(lp.features)))

summary2 = Statistics.colStats( lp: lp.features))
mean = sc.broadcast(summary2.mean())
variance = sc.broadcast(summary2.variance())
scaled_test_data = lp: LabeledPoint(lp.label, rescale(lp.features)))

print scaled_train_data.take(1)
print scaled_test_data.take(1)

[LabeledPoint(0.0, [-1.61588741115,1.05502642604,0.0323641076933,-0.44197356246,-0.280232424358,-0.372379867404,-0.189242709038,-1.61971981276,-1.43413687357,-0.130738595082])]
[LabeledPoint(1.0, [-1.51471707139,-0.810096211109,-0.880088261241,0.688926935369,-0.513138229714,1.29345446628,-0.235417954896,-0.0908158220154,-0.179318768852,1.11221878367])]

Evaluation function

def eval_metrics(lbl_pred):
    tp = float(lbl_pred.filter(lambda lp: lp[0]==1.0 and lp[1]==1.0).count())
    tn = float(lbl_pred.filter(lambda lp: lp[0]==0.0 and lp[1]==0.0).count())
    fp = float(lbl_pred.filter(lambda lp: lp[0]==1.0 and lp[1]==0.0).count())
    fn = float(lbl_pred.filter(lambda lp: lp[0]==0.0 and lp[1]==1.0).count())
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    F_measure = 2 * precision * recall / (precision + recall)
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    return([tp, tn, fp, fn], [precision, recall, F_measure, accuracy])

Logistic regression

from pyspark.mllib.classification import LogisticRegressionWithSGD

model_lr = LogisticRegressionWithSGD.train(scaled_train_data, iterations=100)
labels_and_predictions = lp: (model_lr.predict(lp.features), lp.label))
metrics = eval_metrics(labels_and_predictions)

print('Precision : %.2f' % round(metrics[1][0], 2))
print('Recall : %.2f' % round(metrics[1][1], 2))
print('F1 : %.2f' % round(metrics[1][2], 2))
print('Accuracy : %.2f' % round(metrics[1][3], 2))

Precision : 0.38
Recall : 0.71
F1 : 0.50
Accuracy : 0.63

Support Vector Machine

from pyspark.mllib.classification import SVMWithSGD

model_svm = SVMWithSGD.train(scaled_train_data, iterations=100, step=1.0, regParam=0.01)
labels_and_predictions = lp: (model_svm.predict(lp.features), lp.label))
metrics = eval_metrics(labels_and_predictions)

print('Precision : %.2f' % round(metrics[1][0], 2))
print('Recall : %.2f' % round(metrics[1][1], 2))
print('F1 : %.2f' % round(metrics[1][2], 2))
print('Accuracy : %.2f' % round(metrics[1][3], 2))

Precision : 0.38
Recall : 0.71
F1 : 0.50
Accuracy : 0.63

Naive Bayes

from pyspark.mllib.classification import NaiveBayes

# Naive Bayes expects positive 
# features, so we take the abs
def absolute(feat):
    r = []
    for x in feat:
    return r

absolute_train_data = lp: LabeledPoint(lp.label, absolute(lp.features)))
absolute_test_data = lp: LabeledPoint(lp.label, absolute(lp.features)))

model_nb = NaiveBayes.train(absolute_train_data)
labels_and_predictions = lp: (model_nb.predict(lp.features), lp.label))
metrics = eval_metrics(labels_and_predictions)

print('Precision : %.2f' % round(metrics[1][0], 2))
print('Recall : %.2f' % round(metrics[1][1], 2))
print('F1 : %.2f' % round(metrics[1][2], 2))
print('Accuracy : %.2f' % round(metrics[1][3], 2))

Precision : 0.46
Recall : 0.34
F1 : 0.39
Accuracy : 0.72

