FlightPredict Package management

Run these cells only when you need to install the package or update it. Otherwise go directly the next section

  1. !pip show flightPredict: provides information about the flightPredict package
  2. !pip uninstall --yes flightPredict: uninstall the flight predict package. Run before installing a new version of the package
  3. !pip install --user --exists-action=w --egg git+https://github.com/ibm-watson-data-lab/simple-data-pipe-connector-flightstats.git#egg=flightPredict: Install the flightPredict pacakge directly from Github

In [1]:
!pip show flightPredict


You are using pip version 7.1.0, however version 8.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
---
Metadata-Version: 1.0
Name: flightPredict
Version: 0.3
Summary: Python Library for SPark MLLIB Flight Predict sample application
Home-page: git+https://github.com/ibm-watson-data-lab/pipes-connector-flightstats.git
Author: David Taieb
Author-email: david_taieb@us.ibm.com
License: Apache 2.0
Location: /gpfs/global_fs01/sym_shared/YPProdSpark/user/sd73-de3b55cc941e55-4137fa4057f6/.local/lib/python2.7/site-packages/flightPredict-0.3-py2.7.egg
Requires: 

In [3]:
!pip uninstall --yes flightPredict


You are using pip version 7.1.0, however version 8.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
Uninstalling flightPredict-0.3:
  Successfully uninstalled flightPredict-0.3

In [2]:
!pip install --user --exists-action=w --egg git+https://github.com/ibm-watson-data-lab/simple-data-pipe-connector-flightstats.git#egg=flightPredict


You are using pip version 7.1.0, however version 8.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
Collecting flightPredict from git+https://github.com/ibm-watson-data-lab/simple-data-pipe-connector-flightstats.git#egg=flightPredict
  Cloning https://github.com/ibm-watson-data-lab/simple-data-pipe-connector-flightstats.git to /tmp/pip-build-SGDp9O/flightPredict
git: /disk1/ego/3.1/linux2.6-glibc2.3-x86_64/lib/libz.so.1: no version information available (required by git)
git-remote-https: /disk1/ego/3.1/linux2.6-glibc2.3-x86_64/lib/libz.so.1: no version information available (required by git-remote-https)
git: /disk1/ego/3.1/linux2.6-glibc2.3-x86_64/lib/libz.so.1: no version information available (required by git)
git: /disk1/ego/3.1/linux2.6-glibc2.3-x86_64/lib/libz.so.1: no version information available (required by git)
git: /disk1/ego/3.1/linux2.6-glibc2.3-x86_64/lib/libz.so.1: no version information available (required by git)
Installing collected packages: flightPredict
  Running setup.py install for flightPredict
Successfully installed flightPredict

Import required python package and set the Cloudant credentials

flightPredict is a helper package used to load data into RDD of LabeledPoint


In [1]:
%matplotlib inline
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from numpy import array
import numpy as np
import math
from datetime import datetime
from dateutil import parser

import flightPredict
sqlContext=SQLContext(sc)
flightPredict.sqlContext = sqlContext
flightPredict.cloudantHost='XXXX'
flightPredict.cloudantUserName='XXXX'
flightPredict.cloudantPassword='XXXX'
flightPredict.weatherUrl='https://XXXX:XXXXb@twcservice.mybluemix.net'

load data from training data set and print the schema


In [2]:
dbName = "flightstats_training_data_for_flight_delay_predictive_app_mega_set"
cloudantdata = flightPredict.loadDataSet(dbName,"training")
cloudantdata.printSchema()
cloudantdata.count()


Successfully cached dataframe
Successfully registered SQL table training
root
 |-- _id: string (nullable = true)
 |-- _rev: string (nullable = true)
 |-- actualRunwayArrival: struct (nullable = true)
 |    |-- dateLocal: string (nullable = true)
 |    |-- dateUtc: string (nullable = true)
 |-- actualRunwayDeparture: struct (nullable = true)
 |    |-- dateLocal: string (nullable = true)
 |    |-- dateUtc: string (nullable = true)
 |-- arrivalAirportFsCode: string (nullable = true)
 |-- arrivalTerminal: string (nullable = true)
 |-- arrivalTime: string (nullable = true)
 |-- arrivalWeather: struct (nullable = true)
 |    |-- blunt_phrase: string (nullable = true)
 |    |-- class: string (nullable = true)
 |    |-- clds: string (nullable = true)
 |    |-- day_ind: string (nullable = true)
 |    |-- dewPt: long (nullable = true)
 |    |-- expire_time_gmt: long (nullable = true)
 |    |-- feels_like: long (nullable = true)
 |    |-- gust: long (nullable = true)
 |    |-- heat_index: long (nullable = true)
 |    |-- icon_extd: long (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- m: string (nullable = true)
 |    |-- max_temp: long (nullable = true)
 |    |-- min_temp: long (nullable = true)
 |    |-- obs_id: string (nullable = true)
 |    |-- obs_name: string (nullable = true)
 |    |-- precip_hrly: double (nullable = true)
 |    |-- precip_total: double (nullable = true)
 |    |-- pressure: double (nullable = true)
 |    |-- pressure_desc: string (nullable = true)
 |    |-- pressure_tend: long (nullable = true)
 |    |-- qualifier: string (nullable = true)
 |    |-- qualifier_svrty: string (nullable = true)
 |    |-- rh: long (nullable = true)
 |    |-- snow_hrly: double (nullable = true)
 |    |-- temp: long (nullable = true)
 |    |-- terse_phrase: string (nullable = true)
 |    |-- uv_desc: string (nullable = true)
 |    |-- uv_index: long (nullable = true)
 |    |-- valid_time_gmt: long (nullable = true)
 |    |-- vis: double (nullable = true)
 |    |-- wc: long (nullable = true)
 |    |-- wdir: long (nullable = true)
 |    |-- wdir_cardinal: string (nullable = true)
 |    |-- wspd: long (nullable = true)
 |    |-- wx_icon: long (nullable = true)
 |    |-- wx_phrase: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- carrierFsCode: string (nullable = true)
 |-- classification: long (nullable = true)
 |-- classificationMsg: string (nullable = true)
 |-- codeshares: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- carrierFsCode: string (nullable = true)
 |    |    |-- flightNumber: string (nullable = true)
 |    |    |-- referenceCode: long (nullable = true)
 |    |    |-- serviceClasses: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- serviceType: string (nullable = true)
 |    |    |-- trafficRestrictions: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- deltaDeparture: long (nullable = true)
 |-- departureAirportFsCode: string (nullable = true)
 |-- departureTerminal: string (nullable = true)
 |-- departureTime: string (nullable = true)
 |-- departureWeather: struct (nullable = true)
 |    |-- blunt_phrase: string (nullable = true)
 |    |-- class: string (nullable = true)
 |    |-- clds: string (nullable = true)
 |    |-- day_ind: string (nullable = true)
 |    |-- dewPt: long (nullable = true)
 |    |-- expire_time_gmt: long (nullable = true)
 |    |-- feels_like: long (nullable = true)
 |    |-- gust: long (nullable = true)
 |    |-- heat_index: long (nullable = true)
 |    |-- icon_extd: long (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- m: string (nullable = true)
 |    |-- max_temp: long (nullable = true)
 |    |-- min_temp: long (nullable = true)
 |    |-- obs_id: string (nullable = true)
 |    |-- obs_name: string (nullable = true)
 |    |-- precip_hrly: double (nullable = true)
 |    |-- precip_total: double (nullable = true)
 |    |-- pressure: double (nullable = true)
 |    |-- pressure_desc: string (nullable = true)
 |    |-- pressure_tend: long (nullable = true)
 |    |-- qualifier: string (nullable = true)
 |    |-- qualifier_svrty: string (nullable = true)
 |    |-- rh: long (nullable = true)
 |    |-- snow_hrly: long (nullable = true)
 |    |-- temp: long (nullable = true)
 |    |-- terse_phrase: string (nullable = true)
 |    |-- uv_desc: string (nullable = true)
 |    |-- uv_index: long (nullable = true)
 |    |-- valid_time_gmt: long (nullable = true)
 |    |-- vis: long (nullable = true)
 |    |-- wc: long (nullable = true)
 |    |-- wdir: long (nullable = true)
 |    |-- wdir_cardinal: string (nullable = true)
 |    |-- wspd: long (nullable = true)
 |    |-- wx_icon: long (nullable = true)
 |    |-- wx_phrase: string (nullable = true)
 |-- flightEquipmentIataCode: string (nullable = true)
 |-- flightNumber: string (nullable = true)
 |-- isCodeshare: boolean (nullable = true)
 |-- isWetlease: boolean (nullable = true)
 |-- operator: struct (nullable = true)
 |    |-- carrierFsCode: string (nullable = true)
 |    |-- flightNumber: string (nullable = true)
 |    |-- serviceClasses: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- serviceType: string (nullable = true)
 |    |-- trafficRestrictions: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- pt_type: string (nullable = true)
 |-- referenceCode: string (nullable = true)
 |-- scheduledGateDeparture: struct (nullable = true)
 |    |-- dateLocal: string (nullable = true)
 |    |-- dateUtc: string (nullable = true)
 |-- serviceClasses: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- serviceType: string (nullable = true)
 |-- status: string (nullable = true)
 |-- stops: long (nullable = true)
 |-- trafficRestrictions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- wetleaseOperatorFsCode: string (nullable = true)

Out[2]:
21493

Visualize classes in scatter plot based on 2 features


In [4]:
flightPredict.scatterPlotForFeatures(cloudantdata, \
     "departureWeather.temp","arrivalWeather.temp","Departure Airport Temp", "Arrival Airport Temp")



In [5]:
flightPredict.scatterPlotForFeatures(cloudantdata,\
     "departureWeather.pressure","arrivalWeather.pressure","Departure Airport Pressure", "Arrival Airport Pressure")



In [6]:
flightPredict.scatterPlotForFeatures(cloudantdata,\
 "departureWeather.wspd","arrivalWeather.wspd","Departure Airport Wind Speed", "Arrival Airport Wind Speed")


Load the training data as an RDD of LabeledPoint


In [3]:
computeClassification = (lambda deltaDeparture: 0 if deltaDeparture<13 else (1 if deltaDeparture < 41 else 2))
def customFeatureHandler(s):
    if(s==None):
        return ["departureTime"]
    dt=parser.parse(s.departureTime)
    features=[]
    for i in range(0,7):
        features.append(1 if dt.weekday()==i else 0)
    return features
computeClassification=None
customFeatureHandler=None
numClasses = 5

trainingData = flightPredict.loadLabeledDataRDD("training", computeClassification, customFeatureHandler)
trainingData.take(5)


Out[3]:
[LabeledPoint(1.0, [-6.0,43.0,16.0,2.0,26.0,2.0,0.0,-1.0,50.0,14.0,7.0,19.0,7.0,0.0]),
 LabeledPoint(1.0, [-6.0,43.0,16.0,2.0,26.0,2.0,0.0,-1.0,50.0,14.0,7.0,19.0,7.0,0.0]),
 LabeledPoint(2.0, [-4.0,49.0,12.0,1.0,27.0,1.0,0.0,-3.0,63.0,16.0,3.0,2.0,3.0,0.0]),
 LabeledPoint(1.0, [-4.0,49.0,12.0,1.0,27.0,1.0,0.0,-2.0,74.0,14.0,3.0,5.0,3.0,0.0]),
 LabeledPoint(1.0, [-4.0,49.0,12.0,1.0,27.0,1.0,0.0,-3.0,60.0,16.0,4.0,0.0,4.0,0.0])]

Train multiple classification models


In [4]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
logRegModel = LogisticRegressionWithLBFGS.train(trainingData.map(lambda lp: LabeledPoint(lp.label,\
      np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
      , iterations=100, validateData=False, intercept=True)
print(logRegModel)


(weights=[0.00210791946945,0.0117389916562,0.431528967473,0.0105399464645,0.0108481803904,0.010659663195,0.0461706986706,0.00302730172139,0.00927427236321,0.216309674302,0.00968607702518,0.011345550038,0.00962815602635,0.0273257256324], intercept=1.083169878582377)

In [5]:
from pyspark.mllib.classification import NaiveBayes
#NaiveBayes requires non negative features, set them to 0 for now
modelNaiveBayes = NaiveBayes.train(trainingData.map(lambda lp: LabeledPoint(lp.label, \
                    np.fromiter(map(lambda x: x if x>0.0 else 0.0,lp.features.toArray()),dtype=np.int)\
               ))\
          )

print(modelNaiveBayes)


<pyspark.mllib.classification.NaiveBayesModel object at 0x7f69ea0f2910>

In [6]:
from pyspark.mllib.tree import DecisionTree
modelDecisionTree = DecisionTree.trainClassifier(trainingData.map(lambda lp: LabeledPoint(lp.label,\
      np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
      , numClasses=numClasses, categoricalFeaturesInfo={})
print(modelDecisionTree)


DecisionTreeModel classifier of depth 5 with 59 nodes

In [7]:
from pyspark.mllib.tree import RandomForest
modelRandomForest = RandomForest.trainClassifier(trainingData.map(lambda lp: LabeledPoint(lp.label,\
      np.fromiter(map(lambda x: 0.0 if np.isnan(x) else x,lp.features.toArray()),dtype=np.double )))\
      , numClasses=numClasses, categoricalFeaturesInfo={},numTrees=100)
print(modelRandomForest)


TreeEnsembleModel classifier with 100 trees

Load Blind data from Cloudant database


In [8]:
dbTestName="flightstats_test_data_for_flight_delay_predictive_app"
testCloudantdata = flightPredict.loadDataSet(dbTestName,"test")
testCloudantdata.count()


Successfully cached dataframe
Successfully registered SQL table test
Out[8]:
7690

In [9]:
testData = flightPredict.loadLabeledDataRDD("test",computeClassification, customFeatureHandler)
flightPredict.displayConfusionTable=True
flightPredict.runMetrics(trainingData,modelNaiveBayes,modelDecisionTree,logRegModel,modelRandomForest)


ModelAccuracyPrecisionRecall
NaiveBayesModel41.33%53.27%34.65%
DecisionTreeModel51.82%59.64%58.22%
LogisticRegressionModel26.88%19.35%43.98%
RandomForestModel50.37%58.12%57.74%

Confusion Tables for each Model

NaiveBayesModel

3333.02628.03495.0
3766.03975.04043.0
71.043.0139.0

DecisionTreeModel

1872.07584.00.0
1147.010637.00.0
55.0193.05.0

LogisticRegressionModel

9453.00.00.0
11783.00.00.0
253.00.00.0

RandomForestModel

1609.07847.00.0
982.010802.00.0
62.0191.00.0

Run the predictive model

runModel(departureAirportCode, departureDateTime, arrivalAirportCode, arrivalDateTime)
Note: all DateTime must use UTC format


In [11]:
from flightPredict import run
run.useModels(modelDecisionTree,modelRandomForest)
run.runModel('BOS', "2016-02-08 20:15-0500", 'LAX', "2016-01-08 22:30-0500" )


Logan International AirportPredictionLos Angeles International Airport
  • Forecast: Clear
  • Dew Point: -3
  • Relative Humidity: 66
  • Prevailing Hourly visibility: 9.8
  • Wind Chill: 0
  • Wind direction: 97
  • Wind Speed: 12
  • Feels Like Temperature: 0
  • Hourly Maximum UV Index: 0
  • DecisionTreeModel: Delayed less than 2 hours
  • RandomForestModel: Delayed less than 2 hours
  • Forecast: Clear
  • Dew Point: 11
  • Relative Humidity: 63
  • Prevailing Hourly visibility: 13.4
  • Wind Chill: 18
  • Wind direction: 234
  • Wind Speed: 7
  • Feels Like Temperature: 18
  • Hourly Maximum UV Index: 0

In [12]:
rdd = sqlContext.sql("select deltaDeparture from training").map(lambda s: s.deltaDeparture)\
    .filter(lambda s: s < 50 and s > 12)
    
print(rdd.count())

histo = rdd.histogram(50)
    
#print(histo[0])
#print(histo[1])
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
bins = [i for i in histo[0]]

params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2.5, plSize[1]*2) )
plt.ylabel('Number of records')
plt.xlabel('Bin')
plt.title('Histogram')
intervals = [abs(j-i) for i,j in zip(bins[:-1], bins[1:])]
values=[sum(intervals[:i]) for i in range(0,len(intervals))]
plt.bar(values, histo[1], intervals, color='b', label = "Bins")
plt.xticks(bins[:-1],[int(i) for i in bins[:-1]])
plt.legend()

plt.show()


12624

In [ ]: