Import required python package and set the Cloudant credentials

flightPredict is a helper package used to train and run Spark MLLib models for predicting flight delays based on Weather data


In [2]:
sc.addPyFile("https://github.com/ibm-watson-data-lab/simple-data-pipe-connector-flightstats/raw/master/flightPredict/training.py")
sc.addPyFile("https://github.com/ibm-watson-data-lab/simple-data-pipe-connector-flightstats/raw/master/flightPredict/run.py")
import training
import run

In [3]:
%matplotlib inline
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from numpy import array
import numpy as np
import math
from datetime import datetime
from dateutil import parser

sqlContext=SQLContext(sc)
training.sqlContext = sqlContext
training.cloudantHost='dtaieb.cloudant.com'
training.cloudantUserName='weenesserliffircedinvers'
training.cloudantPassword='72a5c4f939a9e2578698029d2bb041d775d088b5'
training.weatherUrl='https://4b88408f-11e5-4ddc-91a6-fbd442e84879:p6hxeJsfIb@twcservice.mybluemix.net'

load data from training data set and print the schema


In [4]:
dbName="pycon_flightpredict_training_set"
%time cloudantdata = training.loadDataSet(dbName,"training")
%time cloudantdata.printSchema()
%time cloudantdata.count()


Successfully cached dataframe
Successfully registered SQL table training
CPU times: user 12.5 ms, sys: 2.93 ms, total: 15.4 ms
Wall time: 1min 17s
root
 |-- _id: string (nullable = true)
 |-- _rev: string (nullable = true)
 |-- actualRunwayArrival: struct (nullable = true)
 |    |-- dateLocal: string (nullable = true)
 |    |-- dateUtc: string (nullable = true)
 |-- actualRunwayDeparture: struct (nullable = true)
 |    |-- dateLocal: string (nullable = true)
 |    |-- dateUtc: string (nullable = true)
 |-- arrivalAirportFsCode: string (nullable = true)
 |-- arrivalTerminal: string (nullable = true)
 |-- arrivalTime: string (nullable = true)
 |-- arrivalWeather: struct (nullable = true)
 |    |-- blunt_phrase: string (nullable = true)
 |    |-- class: string (nullable = true)
 |    |-- clds: string (nullable = true)
 |    |-- day_ind: string (nullable = true)
 |    |-- dewPt: long (nullable = true)
 |    |-- expire_time_gmt: long (nullable = true)
 |    |-- feels_like: long (nullable = true)
 |    |-- gust: long (nullable = true)
 |    |-- heat_index: long (nullable = true)
 |    |-- icon_extd: long (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- m: string (nullable = true)
 |    |-- max_temp: long (nullable = true)
 |    |-- min_temp: long (nullable = true)
 |    |-- obs_id: string (nullable = true)
 |    |-- obs_name: string (nullable = true)
 |    |-- precip_hrly: double (nullable = true)
 |    |-- precip_total: double (nullable = true)
 |    |-- pressure: double (nullable = true)
 |    |-- pressure_desc: string (nullable = true)
 |    |-- pressure_tend: long (nullable = true)
 |    |-- qualifier: string (nullable = true)
 |    |-- qualifier_svrty: string (nullable = true)
 |    |-- rh: long (nullable = true)
 |    |-- snow_hrly: double (nullable = true)
 |    |-- temp: long (nullable = true)
 |    |-- terse_phrase: string (nullable = true)
 |    |-- uv_desc: string (nullable = true)
 |    |-- uv_index: long (nullable = true)
 |    |-- valid_time_gmt: long (nullable = true)
 |    |-- vis: double (nullable = true)
 |    |-- wc: long (nullable = true)
 |    |-- wdir: long (nullable = true)
 |    |-- wdir_cardinal: string (nullable = true)
 |    |-- wspd: long (nullable = true)
 |    |-- wx_icon: long (nullable = true)
 |    |-- wx_phrase: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- carrierFsCode: string (nullable = true)
 |-- classification: long (nullable = true)
 |-- classificationMsg: string (nullable = true)
 |-- codeshares: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- carrierFsCode: string (nullable = true)
 |    |    |-- flightNumber: string (nullable = true)
 |    |    |-- referenceCode: long (nullable = true)
 |    |    |-- serviceClasses: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- serviceType: string (nullable = true)
 |    |    |-- trafficRestrictions: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- date: struct (nullable = true)
 |    |-- day: long (nullable = true)
 |    |-- hour: long (nullable = true)
 |    |-- month: long (nullable = true)
 |    |-- year: long (nullable = true)
 |-- deltaDeparture: long (nullable = true)
 |-- departureAirportFsCode: string (nullable = true)
 |-- departureTerminal: string (nullable = true)
 |-- departureTime: string (nullable = true)
 |-- departureWeather: struct (nullable = true)
 |    |-- blunt_phrase: string (nullable = true)
 |    |-- class: string (nullable = true)
 |    |-- clds: string (nullable = true)
 |    |-- day_ind: string (nullable = true)
 |    |-- dewPt: long (nullable = true)
 |    |-- expire_time_gmt: long (nullable = true)
 |    |-- feels_like: long (nullable = true)
 |    |-- gust: long (nullable = true)
 |    |-- heat_index: long (nullable = true)
 |    |-- icon_extd: long (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- m: string (nullable = true)
 |    |-- max_temp: long (nullable = true)
 |    |-- min_temp: long (nullable = true)
 |    |-- obs_id: string (nullable = true)
 |    |-- obs_name: string (nullable = true)
 |    |-- precip_hrly: double (nullable = true)
 |    |-- precip_total: double (nullable = true)
 |    |-- pressure: double (nullable = true)
 |    |-- pressure_desc: string (nullable = true)
 |    |-- pressure_tend: long (nullable = true)
 |    |-- qualifier: string (nullable = true)
 |    |-- qualifier_svrty: string (nullable = true)
 |    |-- rh: long (nullable = true)
 |    |-- snow_hrly: long (nullable = true)
 |    |-- temp: long (nullable = true)
 |    |-- terse_phrase: string (nullable = true)
 |    |-- uv_desc: string (nullable = true)
 |    |-- uv_index: long (nullable = true)
 |    |-- valid_time_gmt: long (nullable = true)
 |    |-- vis: double (nullable = true)
 |    |-- wc: long (nullable = true)
 |    |-- wdir: long (nullable = true)
 |    |-- wdir_cardinal: string (nullable = true)
 |    |-- wspd: long (nullable = true)
 |    |-- wx_icon: long (nullable = true)
 |    |-- wx_phrase: string (nullable = true)
 |-- flightEquipmentIataCode: string (nullable = true)
 |-- flightNumber: string (nullable = true)
 |-- isCodeshare: boolean (nullable = true)
 |-- isWetlease: boolean (nullable = true)
 |-- operator: struct (nullable = true)
 |    |-- carrierFsCode: string (nullable = true)
 |    |-- flightNumber: string (nullable = true)
 |    |-- serviceClasses: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- serviceType: string (nullable = true)
 |    |-- trafficRestrictions: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- pt_type: string (nullable = true)
 |-- referenceCode: string (nullable = true)
 |-- scheduledGateDeparture: struct (nullable = true)
 |    |-- dateLocal: string (nullable = true)
 |    |-- dateUtc: string (nullable = true)
 |-- serviceClasses: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- serviceType: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stops: long (nullable = true)
 |-- trafficRestrictions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- wetleaseOperatorFsCode: string (nullable = true)

CPU times: user 2.3 ms, sys: 1.8 ms, total: 4.11 ms
Wall time: 14.6 ms
CPU times: user 2.13 ms, sys: 923 µs, total: 3.06 ms
Wall time: 20 s
Out[4]:
33336

Visualize classes in scatter plot based on 2 features


In [5]:
training.scatterPlotForFeatures(cloudantdata, \
     "departureWeather.temp","arrivalWeather.temp","Departure Airport Temp", "Arrival Airport Temp")



In [6]:
training.scatterPlotForFeatures(cloudantdata,\
     "departureWeather.pressure","arrivalWeather.pressure","Departure Airport Pressure", "Arrival Airport Pressure")



In [7]:
training.scatterPlotForFeatures(cloudantdata,\
 "departureWeather.wspd","arrivalWeather.wspd","Departure Airport Wind Speed", "Arrival Airport Wind Speed")


Load the training data as an RDD of LabeledPoint


In [8]:
trainingData = training.loadLabeledDataRDD("training")
trainingData.take(5)


Out[8]:
[LabeledPoint(2.0, [12.0,40.0,16.0,27.0,24.0,27.0,6.0,14.0,56.0,16.0,23.0,13.0,23.0,3.0]),
 LabeledPoint(1.0, [11.0,92.0,8.0,12.0,16.0,12.0,1.0,14.0,85.0,11.0,17.0,6.0,17.0,1.0]),
 LabeledPoint(1.0, [11.0,92.0,8.0,12.0,16.0,12.0,1.0,14.0,85.0,11.0,17.0,6.0,17.0,1.0]),
 LabeledPoint(2.0, [12.0,40.0,16.0,27.0,24.0,27.0,6.0,14.0,56.0,16.0,23.0,13.0,23.0,3.0]),
 LabeledPoint(1.0, [11.0,92.0,8.0,12.0,16.0,12.0,1.0,14.0,85.0,11.0,17.0,6.0,17.0,1.0])]

Train multiple classification models


In [9]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
logRegModel = LogisticRegressionWithLBFGS.train(trainingData.map(lambda lp: LabeledPoint(lp.label,\
      np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
      , iterations=1000, validateData=False, intercept=False)
print(logRegModel)


(weights=[0.00719553724796,0.0129003735855,0.206800773733,0.01926252012,0.0135932018824,0.0194651733633,0.056072192077,0.00719637860572,0.0124349802337,0.219473607527,0.0166882780605,0.0158291274796,0.0161759477119,0.036489164221], intercept=0.0)

In [10]:
from pyspark.mllib.classification import NaiveBayes
#NaiveBayes requires non negative features, set them to 0 for now
modelNaiveBayes = NaiveBayes.train(trainingData.map(lambda lp: LabeledPoint(lp.label, \
                    np.fromiter(map(lambda x: x if x>0.0 else 0.0,lp.features.toArray()),dtype=np.int)\
               ))\
          )

print(modelNaiveBayes)


<pyspark.mllib.classification.NaiveBayesModel object at 0x7f71f21159d0>

In [11]:
from pyspark.mllib.tree import DecisionTree
modelDecisionTree = DecisionTree.trainClassifier(trainingData.map(lambda lp: LabeledPoint(lp.label,\
      np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
      , numClasses=training.getNumClasses(), categoricalFeaturesInfo={})
print(modelDecisionTree)


DecisionTreeModel classifier of depth 5 with 61 nodes

In [12]:
from pyspark.mllib.tree import RandomForest
modelRandomForest = RandomForest.trainClassifier(trainingData.map(lambda lp: LabeledPoint(lp.label,\
      np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
      , numClasses=training.getNumClasses(), categoricalFeaturesInfo={},numTrees=100)
print(modelRandomForest)


TreeEnsembleModel classifier with 100 trees

Load Test data from Cloudant database and compute accuracy metrics


In [13]:
dbTestName="pycon_flightpredict_test_set"
testCloudantdata = training.loadDataSet(dbTestName,"test")
testCloudantdata.count()


Successfully cached dataframe
Successfully registered SQL table test
Out[13]:
8074

In [14]:
testData = training.loadLabeledDataRDD("test")
training.displayConfusionTable=True
training.runMetrics(testData,modelNaiveBayes,modelDecisionTree,logRegModel,modelRandomForest)


ModelAccuracyPrecisionRecall
NaiveBayesModel21.14%57.46%14.31%
DecisionTreeModel55.75%56.80%57.44%
LogisticRegressionModel30.03%22.09%46.88%
RandomForestModel56.10%58.02%58.34%

Confusion Tables for each Model

NaiveBayesModel

On TimeDelayed less than 2 hoursDelayed more than 4 hours
On Time510.0387.02899.0
Delayed less than 2 hours352.0533.03274.0
Delayed more than 4 hours7.00.0112.0

DecisionTreeModel

On TimeDelayed less than 2 hoursDelayed more than 4 hours
On Time1535.02261.00.0
Delayed less than 2 hours1056.03103.00.0
Delayed more than 4 hours40.079.00.0

LogisticRegressionModel

On TimeDelayed less than 2 hoursDelayed more than 4 hours
On Time3785.00.00.0
Delayed less than 2 hours4153.00.00.0
Delayed more than 4 hours119.00.00.0

RandomForestModel

On TimeDelayed less than 2 hoursDelayed more than 4 hours
On Time1436.02360.00.0
Delayed less than 2 hours885.03274.00.0
Delayed more than 4 hours54.065.00.0

Accuracy analysis and model refinement

  • Run Histogram to refine classification

In [15]:
rdd = sqlContext.sql("select deltaDeparture from training").map(lambda s: s.deltaDeparture)\
    .filter(lambda s: s < 50 and s > 12)
    
print(rdd.count())

histo = rdd.histogram(50)
    
#print(histo[0])
#print(histo[1])
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
bins = [i for i in histo[0]]

params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2.5, plSize[1]*2) )
plt.ylabel('Number of records')
plt.xlabel('Bin')
plt.title('Histogram')
intervals = [abs(j-i) for i,j in zip(bins[:-1], bins[1:])]
values=[sum(intervals[:i]) for i in range(0,len(intervals))]
plt.bar(values, histo[1], intervals, color='b', label = "Bins")
plt.xticks(bins[:-1],[int(i) for i in bins[:-1]])
plt.legend()

plt.show()


19095
  • Customize classification using Training Handler class extension
  • Add new features to the model
  • Re-build the models
  • Re-compute accuracy metrics

In [16]:
class customTrainingHandler(training.defaultTrainingHandler):
    def getClassLabel(self, value):
        if ( int(value)==0 ):
            return "Delayed less than 13 minutes"
        elif (int(value)==1 ):
            return "Delayed between 13 and 41 minutes"
        elif (int(value) == 2 ):
            return "Delayed more than 41 minutes"
        return value
    
    def numClasses(self):
        return 3
    
    def computeClassification(self, s):
        return 0 if s.deltaDeparture<13 else (1 if s.deltaDeparture < 41 else 2)
    
    def customTrainingFeaturesNames(self ):
        return ["departureTime"]
    
    def customTrainingFeatures(self, s):
        dt=parser.parse(s.departureTime)
        print(dt)
        features=[]
        for i in range(0,7):
            features.append(1 if dt.weekday()==i else 0)
        return features

training.customTrainingHandler=customTrainingHandler()

#reload the training labeled data RDD
trainingData = training.loadLabeledDataRDD("training")

#recompute the models
logRegModel = LogisticRegressionWithLBFGS.train(trainingData.map(lambda lp: LabeledPoint(lp.label,\
      np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
      , iterations=1000, validateData=False, intercept=False)
modelNaiveBayes = NaiveBayes.train(trainingData.map(lambda lp: LabeledPoint(lp.label, \
                    np.fromiter(map(lambda x: x if x>0.0 else 0.0,lp.features.toArray()),dtype=np.int)\
               ))\
          )
modelDecisionTree = DecisionTree.trainClassifier(trainingData.map(lambda lp: LabeledPoint(lp.label,\
      np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
      , numClasses=training.getNumClasses(), categoricalFeaturesInfo={})

modelRandomForest = RandomForest.trainClassifier(trainingData.map(lambda lp: LabeledPoint(lp.label,\
      np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
      , numClasses=training.getNumClasses(), categoricalFeaturesInfo={},numTrees=100)

#reload the test labeled data
testData = training.loadLabeledDataRDD("test")

#recompute the accuracy metrics
training.displayConfusionTable=True
training.runMetrics(testData,modelNaiveBayes,modelDecisionTree,logRegModel,modelRandomForest)


ModelAccuracyPrecisionRecall
NaiveBayesModel26.80%47.04%24.06%
DecisionTreeModel50.83%57.18%56.49%
LogisticRegressionModel37.28%46.01%53.33%
RandomForestModel46.50%51.34%55.80%

Confusion Tables for each Model

NaiveBayesModel

Delayed less than 13 minutesDelayed between 13 and 41 minutesDelayed more than 41 minutes
Delayed less than 13 minutes352.0643.02017.0
Delayed between 13 and 41 minutes416.01062.02829.0
Delayed more than 41 minutes68.0158.0529.0

DecisionTreeModel

Delayed less than 13 minutesDelayed between 13 and 41 minutesDelayed more than 41 minutes
Delayed less than 13 minutes806.02201.05.0
Delayed between 13 and 41 minutes582.03718.07.0
Delayed more than 41 minutes150.0568.037.0

LogisticRegressionModel

Delayed less than 13 minutesDelayed between 13 and 41 minutesDelayed more than 41 minutes
Delayed less than 13 minutes8.03004.00.0
Delayed between 13 and 41 minutes9.04298.00.0
Delayed more than 41 minutes0.0755.00.0

RandomForestModel

Delayed less than 13 minutesDelayed between 13 and 41 minutesDelayed more than 41 minutes
Delayed less than 13 minutes481.02531.00.0
Delayed between 13 and 41 minutes283.04024.00.0
Delayed more than 41 minutes62.0693.00.0

Run the predictive model

runModel(departureAirportCode, departureDateTime, arrivalAirportCode, arrivalDateTime)
Note: all DateTime must use UTC format


In [17]:
run.useModels(modelNaiveBayes,modelDecisionTree,logRegModel,modelRandomForest)
run.runModel('BOS', "2016-05-18 20:15-0500", 'AUS', "2016-05-18 22:30-0800" )


2016-05-18 20:15:00-05:00
Logan International AirportPredictionAustin-Bergstrom International Airport
  • Forecast: P Cloudy
  • Dew Point: 15
  • Relative Humidity: 67
  • Prevailing Hourly visibility: 15.0
  • Wind Chill: 22
  • Wind Speed: 18
  • Feels Like Temperature: 22
  • Hourly Maximum UV Index: 0
  • NaiveBayesModel: Delayed between 13 and 41 minutes
  • DecisionTreeModel: Delayed between 13 and 41 minutes
  • LogisticRegressionModel: Delayed between 13 and 41 minutes
  • RandomForestModel: Delayed between 13 and 41 minutes
  • Forecast: P Cloudy
  • Dew Point: 24
  • Relative Humidity: 95
  • Prevailing Hourly visibility: 8.0
  • Wind Chill: 25
  • Wind Speed: 16
  • Feels Like Temperature: 28
  • Hourly Maximum UV Index: 0

In [ ]: