Initialize PySpark

First, we use the findspark package to initialize PySpark.


In [1]:
# Initialize PySpark
APP_NAME = "Debugging Prediction Problems"

# If there is no SparkSession, create the environment
try:
  sc and spark
except NameError as e:
  import findspark
  findspark.init()
  import pyspark
  import pyspark.sql

  sc = pyspark.SparkContext()
  spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()

print("PySpark initiated...")


PySpark initiated...

Hello, World!

Loading data, mapping it and collecting the records into RAM...


In [2]:
# Load the text file using the SparkContext
csv_lines = sc.textFile("../data/example.csv")

# Map the data to split the lines into a list
data = csv_lines.map(lambda line: line.split(","))

# Collect the dataset into local RAM
data.collect()


Out[2]:
[['Russell Jurney', 'Relato', 'CEO'],
 ['Florian Liebert', 'Mesosphere', 'CEO'],
 ['Don Brown', 'Rocana', 'CIO'],
 ['Steve Jobs', 'Apple', 'CEO'],
 ['Donald Trump', 'The Trump Organization', 'CEO'],
 ['Russell Jurney', 'Data Syndrome', 'Principal Consultant']]

Creating Objects from CSV using pyspark.RDD.map

Using a function with a map operation to create objects (dicts) as records...


In [3]:
# Turn the CSV lines into objects
def csv_to_record(line):
  parts = line.split(",")
  record = {
    "name": parts[0],
    "company": parts[1],
    "title": parts[2]
  }
  return record

# Apply the function to every record
records = csv_lines.map(csv_to_record)

# Inspect the first item in the dataset
records.first()


Out[3]:
{'name': 'Russell Jurney', 'company': 'Relato', 'title': 'CEO'}

pyspark.RDD.groupBy

Using the groupBy operator to count the number of jobs per person...


In [5]:
# Group the records by the name of the person
grouped_records = records.groupBy(lambda x: x["name"])

# Show the first group
print(grouped_records.first())

# Count the groups
job_counts = grouped_records.map(
  lambda x: {
    "name": x[0],
    "job_count": len(x[1])
  }
)

job_counts.collect()


('Florian Liebert', <pyspark.resultiterable.ResultIterable object at 0x7f8a6020f438>)
Out[5]:
[{'name': 'Florian Liebert', 'job_count': 1},
 {'name': 'Russell Jurney', 'job_count': 2},
 {'name': 'Don Brown', 'job_count': 1},
 {'name': 'Steve Jobs', 'job_count': 1},
 {'name': 'Donald Trump', 'job_count': 1}]

Exercises

  1. Use pyspark.RDD.groupBy to group executives by job title, then prepare records with the job title and the count of the number of executives with that job.

Map vs FlatMap

We need to understand the difference between the map and flatmap operators. Map groups items per-record, while flatMap creates a single large group of items.


In [6]:
# Compute a relation of words by line
words_by_line = csv_lines\
  .map(lambda line: line.split(","))

words_by_line.collect()


Out[6]:
[['Russell Jurney', 'Relato', 'CEO'],
 ['Florian Liebert', 'Mesosphere', 'CEO'],
 ['Don Brown', 'Rocana', 'CIO'],
 ['Steve Jobs', 'Apple', 'CEO'],
 ['Donald Trump', 'The Trump Organization', 'CEO'],
 ['Russell Jurney', 'Data Syndrome', 'Principal Consultant']]

In [7]:
# Compute a relation of words
flattened_words = csv_lines\
  .map(lambda line: line.split(","))\
  .flatMap(lambda x: x)
flattened_words.collect()


Out[7]:
['Russell Jurney',
 'Relato',
 'CEO',
 'Florian Liebert',
 'Mesosphere',
 'CEO',
 'Don Brown',
 'Rocana',
 'CIO',
 'Steve Jobs',
 'Apple',
 'CEO',
 'Donald Trump',
 'The Trump Organization',
 'CEO',
 'Russell Jurney',
 'Data Syndrome',
 'Principal Consultant']

In [8]:
lengths = flattened_words.map(lambda x: len(x))
lengths.collect()


Out[8]:
[14, 6, 3, 15, 10, 3, 9, 6, 3, 10, 5, 3, 12, 22, 3, 14, 13, 20]

In [9]:
lengths.sum() / lengths.count()


Out[9]:
9.5

Creating Rows

We can create pyspark.sql.Rows out of python objects so we can create pyspark.sql.DataFrames. This is desirable because once we have DataFrames we can run Spark SQL on our data.


In [11]:
from pyspark.sql import Row

# Convert the CSV into a pyspark.sql.Row
def csv_to_row(line):
  parts = line.split(",")
  row = Row(
    name=parts[0],
    company=parts[1],
    title=parts[2]
  )
  return row

# Apply the function to get rows in an RDD
rows = csv_lines.map(csv_to_row)
rows.first()


Out[11]:
Row(company='Relato', name='Russell Jurney', title='CEO')

Exercises

  1. First count the number of companies for each executive, then create a pyspark.sql.Row for this result.

In [14]:
records = csv_lines.map(lambda line: line.split(','))
records.collect()

groups = records.groupBy(lambda x: x[0])
counts = groups.map(lambda x: (x[0], len(x[1])))
new_rows = counts.map(lambda x: Row(name=x[0], total=x[1]))
new_rows.collect()


Out[14]:
[Row(name='Florian Liebert', total=1),
 Row(name='Russell Jurney', total=2),
 Row(name='Don Brown', total=1),
 Row(name='Steve Jobs', total=1),
 Row(name='Donald Trump', total=1)]

In [15]:
new_rows.toDF().select("name","total").show()


+---------------+-----+
|           name|total|
+---------------+-----+
|Florian Liebert|    1|
| Russell Jurney|    2|
|      Don Brown|    1|
|     Steve Jobs|    1|
|   Donald Trump|    1|
+---------------+-----+

Creating DataFrames from RDDs

Using the RDD.toDF() method to create a dataframe, registering the DataFrame as a temporary table with Spark SQL, and counting the jobs per person using Spark SQL.


In [16]:
# Convert to a pyspark.sql.DataFrame
rows_df = rows.toDF()

rows_df.show()

# Register the DataFrame for Spark SQL
rows_df.registerTempTable("executives")

# Generate a new DataFrame with SQL using the SparkSession
job_counts = spark.sql("""
SELECT
  name,
  COUNT(*) AS total
  FROM executives
  GROUP BY name
""")
job_counts.show()

# Go back to an RDD
job_counts.rdd.map(lambda x: x.asDict()).collect()


+--------------------+---------------+--------------------+
|             company|           name|               title|
+--------------------+---------------+--------------------+
|              Relato| Russell Jurney|                 CEO|
|          Mesosphere|Florian Liebert|                 CEO|
|              Rocana|      Don Brown|                 CIO|
|               Apple|     Steve Jobs|                 CEO|
|The Trump Organiz...|   Donald Trump|                 CEO|
|       Data Syndrome| Russell Jurney|Principal Consultant|
+--------------------+---------------+--------------------+

+---------------+-----+
|           name|total|
+---------------+-----+
|   Donald Trump|    1|
|Florian Liebert|    1|
|      Don Brown|    1|
| Russell Jurney|    2|
|     Steve Jobs|    1|
+---------------+-----+

Out[16]:
[{'name': 'Donald Trump', 'total': 1},
 {'name': 'Florian Liebert', 'total': 1},
 {'name': 'Don Brown', 'total': 1},
 {'name': 'Russell Jurney', 'total': 2},
 {'name': 'Steve Jobs', 'total': 1}]

SparkContext.parallelize()

The oppotiste of pyspark.RDD.collect() is SparkContext.parallelize(). Whereas collect pulls data from Spark's memory into local RAM, parallelize sends data from local memory to Spark's memory.

You can access it like this:


In [17]:
my_rdd = sc.parallelize([1,2,3,4,5])
my_rdd.first()


Out[17]:
1

Exercises

  1. Create your own RDD of dict elements with named fields using sc.parallelize. Make it at least 5 records long.
  2. Convert this RDD of dicts into an RDD of pyspark.sql.Row elements.
  3. Convert this RDD of pyspark.sql.Rows into a pyspark.sql.DataFrame.
  4. Run a SQL GROUP BY/COUNT on your new DataFrame.

In [18]:
my_data = [
    {"name": "Russell Jurney", "interest": "Ancient Greece"},
    {"name": "Chris Jurney", "interest": "Virtual Reality"},
    {"name": "Bill Jurney", "interest": "Sports"},
    {"name": "Ruth Jurney", "interest": "Wildlife"},
    {"name": "Bob Smith", "interest": "Sports"}
]
my_rdd = sc.parallelize(my_data)
my_rows = my_rdd.map(lambda x: Row(name=x["name"], interest=x["interest"]))

my_df = my_rows.toDF()
my_df.show()

my_df.registerTempTable("people")
spark.sql("""SELECT interest, COUNT(*) as total FROM people GROUP BY interest""").show()


+---------------+--------------+
|       interest|          name|
+---------------+--------------+
| Ancient Greece|Russell Jurney|
|Virtual Reality|  Chris Jurney|
|         Sports|   Bill Jurney|
|       Wildlife|   Ruth Jurney|
|         Sports|     Bob Smith|
+---------------+--------------+

+---------------+-----+
|       interest|total|
+---------------+-----+
|Virtual Reality|    1|
|         Sports|    2|
|       Wildlife|    1|
| Ancient Greece|    1|
+---------------+-----+

Creating RDDs from DataFrames

We can easily convert back from a DataFrame to an RDD using the pyspark.sql.DataFrame.rdd() method, along with pyspark.sql.Row.asDict() if we desire a Python dict of our records.


In [19]:
job_counts.rdd.map(lambda x: x.asDict()).collect()


Out[19]:
[{'name': 'Donald Trump', 'total': 1},
 {'name': 'Florian Liebert', 'total': 1},
 {'name': 'Don Brown', 'total': 1},
 {'name': 'Russell Jurney', 'total': 2},
 {'name': 'Steve Jobs', 'total': 1}]

Exercises

  1. Using the data from item 4 in the exercise above, convert the data back to its original form, a local collection of dict elements.

In [20]:
my_df.rdd.map(lambda x: x.asDict()).collect()


Out[20]:
[{'interest': 'Ancient Greece', 'name': 'Russell Jurney'},
 {'interest': 'Virtual Reality', 'name': 'Chris Jurney'},
 {'interest': 'Sports', 'name': 'Bill Jurney'},
 {'interest': 'Wildlife', 'name': 'Ruth Jurney'},
 {'interest': 'Sports', 'name': 'Bob Smith'}]

Loading and Inspecting Parquet Files

Using the SparkSession to load files as DataFrames and inspecting their contents...


In [22]:
# Load the parquet file containing flight delay records
on_time_dataframe = spark.read.parquet('../data/on_time_performance.parquet')

# Register the data for Spark SQL
on_time_dataframe.registerTempTable("on_time_performance")

# Check out the columns
on_time_dataframe.columns


Out[22]:
['Year',
 'Quarter',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'FlightDate',
 'Carrier',
 'TailNum',
 'FlightNum',
 'Origin',
 'OriginCityName',
 'OriginState',
 'Dest',
 'DestCityName',
 'DestState',
 'DepTime',
 'DepDelay',
 'DepDelayMinutes',
 'TaxiOut',
 'TaxiIn',
 'WheelsOff',
 'WheelsOn',
 'ArrTime',
 'ArrDelay',
 'ArrDelayMinutes',
 'Cancelled',
 'Diverted',
 'ActualElapsedTime',
 'AirTime',
 'Flights',
 'Distance',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay',
 'CRSDepTime',
 'CRSArrTime']

In [23]:
# Trim the fields and keep the result
trimmed_on_time = on_time_dataframe\
  .select(
    "FlightDate",
    "TailNum",
    "Origin",
    "Dest",
    "Carrier",
    "DepDelay",
    "ArrDelay"
  )

# Sample 0.01% of the data and show
trimmed_on_time.sample(False, 0.0001).show(10)

sampled_ten_percent = trimmed_on_time.sample(False, 0.1)
sampled_ten_percent.show(10)


+----------+-------+------+----+-------+--------+--------+
|FlightDate|TailNum|Origin|Dest|Carrier|DepDelay|ArrDelay|
+----------+-------+------+----+-------+--------+--------+
|2015-01-01| N919DE|   IND| MSP|     DL|    -9.0|   -20.0|
|2015-01-10| N28478|   IAH| PHL|     UA|    33.0|    22.0|
|2015-01-10| N75410|   SLC| IAH|     UA|    30.0|    29.0|
|2015-01-12| N548AS|   LAX| PDX|     AS|    -6.0|     3.0|
|2015-01-12| N608SK|   MSP| FSD|     OO|    -3.0|   -12.0|
|2015-01-12| N804SK|   LAX| OAK|     OO|    -5.0|    -7.0|
|2015-01-12| N7707C|   OAK| PDX|     WN|    11.0|    13.0|
|2015-01-14| N630MQ|   ORD| FAR|     MQ|    75.0|    62.0|
|2015-01-15| N133EV|   HPN| ATL|     EV|    -4.0|     4.0|
|2015-01-16| N530AS|   SEA| LAX|     AS|    10.0|     7.0|
+----------+-------+------+----+-------+--------+--------+
only showing top 10 rows

+----------+-------+------+----+-------+--------+--------+
|FlightDate|TailNum|Origin|Dest|Carrier|DepDelay|ArrDelay|
+----------+-------+------+----+-------+--------+--------+
|2015-01-01| N003AA|   HDN| DFW|     AA|   332.0|   336.0|
|2015-01-01| N012AA|   MIA| TPA|     AA|    null|    null|
|2015-01-01| N020AA|   IAD| MIA|     AA|    -6.0|     1.0|
|2015-01-01| N023AA|   STL| MIA|     AA|   109.0|   106.0|
|2015-01-01| N200AA|   DFW| DEN|     AA|    47.0|    40.0|
|2015-01-01| N3AAAA|   RDU| MIA|     AA|    -8.0|    10.0|
|2015-01-01| N3ACAA|   DFW| MIA|     AA|    21.0|    22.0|
|2015-01-01| N3AKAA|   ORD| RSW|     AA|     5.0|    10.0|
|2015-01-01| N3ALAA|   DFW| CLT|     AA|     8.0|    -9.0|
|2015-01-01| N3AMAA|   LAX| IAD|     AA|    45.0|    39.0|
+----------+-------+------+----+-------+--------+--------+
only showing top 10 rows

DataFrame Workflow: Calculating Speed in Dataflow and SQL

We can go back and forth between dataflow programming and SQL programming using pyspark.sql.DataFrames. This enables us to get the best of both worlds from these two APIs. For example, if we want to group records and get a total count for each group... a SQL SELECT/GROUP BY/COUNT is the most direct way to do it. On the other hand, if we want to filter data, a dataflow API call like DataFrame.filter() is the cleanest way. This comes down to personal preference for the user. In time you will develop your own style of working.

Dataflow Programming

If we were to look at the AirTime along with the Distance, we could get a good idea of how fast the airplanes were going. Pretty cool! Lets do this using Dataflows first.

Trimming Our Data

First lets select just the two columns of interest: AirTime and Distance. We can always go back and select more columns if we want to extend our analysis, but trimming uneeded fields optimizes performance right away.


In [24]:
fd = on_time_dataframe.select("AirTime", "Distance")
fd.show(6)


+-------+--------+
|AirTime|Distance|
+-------+--------+
|   59.0|   432.0|
|   77.0|   432.0|
|  129.0|   802.0|
|   93.0|   731.0|
|  111.0|   769.0|
|  108.0|   769.0|
+-------+--------+
only showing top 6 rows

From Minutes to Hours

Now lets convert our AirTime from minutes to hours by dividing by 60.


In [25]:
hourly_fd = fd.select((fd.AirTime / 60).alias('Hours'), "Distance")
hourly_fd.show(5)


+------------------+--------+
|             Hours|Distance|
+------------------+--------+
|0.9833333333333333|   432.0|
|1.2833333333333334|   432.0|
|              2.15|   802.0|
|              1.55|   731.0|
|              1.85|   769.0|
+------------------+--------+
only showing top 5 rows

Raw Calculation

Now lets calculate miles per hour!


In [26]:
miles_per_hour = hourly_fd.select(
    (hourly_fd.Distance / hourly_fd.Hours).alias('Mph')
)
miles_per_hour.show(10)


+------------------+
|               Mph|
+------------------+
| 439.3220338983051|
| 336.6233766233766|
| 373.0232558139535|
|471.61290322580646|
| 415.6756756756757|
|427.22222222222223|
| 430.2739726027398|
|              null|
|              null|
|              null|
+------------------+
only showing top 10 rows

Investigating nulls

Looks like we have some errors in some records in our calculation because of missing fields? Lets bring back in the Distance and AirTime fields to see where the problem is coming from.


In [27]:
fd.select(
    "AirTime", 
    (fd.AirTime / 60).alias('Hours'), 
    "Distance"
).show()


+-------+------------------+--------+
|AirTime|             Hours|Distance|
+-------+------------------+--------+
|   59.0|0.9833333333333333|   432.0|
|   77.0|1.2833333333333334|   432.0|
|  129.0|              2.15|   802.0|
|   93.0|              1.55|   731.0|
|  111.0|              1.85|   769.0|
|  108.0|               1.8|   769.0|
|  146.0| 2.433333333333333|  1047.0|
|   null|              null|  1007.0|
|   null|              null|  1007.0|
|   null|              null|   802.0|
|   null|              null|   731.0|
|  122.0| 2.033333333333333|   731.0|
|   94.0|1.5666666666666667|   731.0|
|   91.0|1.5166666666666666|   731.0|
|  115.0|1.9166666666666667|   731.0|
|   89.0|1.4833333333333334|   731.0|
|  106.0|1.7666666666666666|   721.0|
|   94.0|1.5666666666666667|   748.0|
|   null|              null|   733.0|
|   null|              null|   733.0|
+-------+------------------+--------+
only showing top 20 rows

Filtering nulls

Now that we know some records are missing AirTimes, we can filter those records using pyspark.sql.DataFrame.filter(). Starting from the beginning, lets recalculate our values.


In [28]:
fd = on_time_dataframe.select("AirTime", "Distance")
filled_fd = fd.filter(fd.AirTime.isNotNull())
hourly_fd = filled_fd.select(
    "AirTime", 
    (filled_fd.AirTime / 60).alias('Hours'), 
    "Distance"
)
mph = hourly_fd.select((hourly_fd.Distance / hourly_fd.Hours).alias('Mph'))
mph.show(10)


+------------------+
|               Mph|
+------------------+
| 439.3220338983051|
| 336.6233766233766|
| 373.0232558139535|
|471.61290322580646|
| 415.6756756756757|
|427.22222222222223|
| 430.2739726027398|
| 359.5081967213115|
|466.59574468085106|
|  481.978021978022|
+------------------+
only showing top 10 rows

Averaging Speed

How fast does the fleet travel overall? Lets compute the average speed for the entire fleet.


In [29]:
from pyspark.sql.functions import avg

mph.select(
    pyspark.sql.functions.avg(mph.Mph)
).show()


+------------------+
|          avg(Mph)|
+------------------+
|408.72370268242895|
+------------------+

It looks like the average speed of the fleet is 408 mph. Note how along the way we chekced the data for sanity, which led to confidence in our answer. SQL by contast can hide the internals of a query, which might have skewed our average significantly!

SQL-Based Speed Calculation

Now lets work the same thing out in SQL. Starting from the top:


In [30]:
on_time_dataframe.registerTempTable("on_time_performance")

mph = spark.sql("""
SELECT ( Distance / ( AirTime/60 ) ) AS Mph 
FROM on_time_performance 
WHERE AirTime IS NOT NULL
ORDER BY AirTime
""")
mph.show(10)

mph.registerTempTable("mph")
spark.sql("SELECT AVG(Mph) from mph").show()


+-----------------+
|              Mph|
+-----------------+
|265.7142857142857|
|265.7142857142857|
|265.7142857142857|
|265.7142857142857|
|265.7142857142857|
|265.7142857142857|
|265.7142857142857|
|            232.5|
|            232.5|
|            232.5|
+-----------------+
only showing top 10 rows

+-----------------+
|         avg(Mph)|
+-----------------+
|408.7237026822692|
+-----------------+

Evaluating SQL

The SQL based solution seems to be better in this case, because we can simply express our calculation all in one place. When complexity grows however, it is best to break a single query into multiple stages where you use SQL or Dataflow programming to massage the data into shape.

Calculating Histograms

Having computed the speed in miles per hour and the overall average speed of passenger jets in the US, lets dig deeper by using the RDD API's histogram method to calculate histograms buckets and values, which will then use to visualize data.


In [31]:
# Compute a histogram of departure delays
mph\
  .select("Mph")\
  .rdd\
  .flatMap(lambda x: x)\
  .histogram(10)


Out[31]:
([32.34375,
  107.89626024590164,
  183.4487704918033,
  259.0012807377049,
  334.5537909836066,
  410.10630122950823,
  485.65881147540983,
  561.2113217213115,
  636.7638319672132,
  712.3163422131148,
  787.8688524590165],
 [174, 14255, 148314, 643901, 1861699, 2357002, 664609, 23997, 32, 25])

Visualizing Histograms

The problem with the output above is that it is hard to interpret. For better understanding, we need a visualization. We can use matplotlib inline in a Jupyter Notebook to visualize this distribution and see what the tendency of speed of airplanes is around the mean of 408 mph.


In [32]:
%matplotlib inline

import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt

# Function to plot a histogram using pyplot
def create_hist(rdd_histogram_data):
  """Given an RDD.histogram, plot a pyplot histogram"""
  heights = np.array(rdd_histogram_data[1])
  full_bins = rdd_histogram_data[0]
  mid_point_bins = full_bins[:-1]
  widths = [abs(i - j) for i, j in zip(full_bins[:-1], full_bins[1:])]
  bar = plt.bar(mid_point_bins, heights, width=widths, color='b')
  return bar

# Compute a histogram of departure delays
departure_delay_histogram = mph\
  .select("Mph")\
  .rdd\
  .flatMap(lambda x: x)\
  .histogram(10)

create_hist(departure_delay_histogram)


Out[32]:
<BarContainer object of 10 artists>

Iterating on a Histogram

That looks interesting, but the bars seem to fat the really see what is going on. Lets double the number of buckets from 10 to 20. We can reuse the create_hist() method to do so.


In [33]:
# Compute a histogram of departure delays
departure_delay_histogram = mph\
  .select("Mph")\
  .rdd\
  .flatMap(lambda x: x)\
  .histogram(20)

create_hist(departure_delay_histogram)


Out[33]:
<BarContainer object of 20 artists>

Speed Summary

You've now seen how to calculate different values in both SQL and Dataflow style, how to switch between the two methods, how to switch between the pyspark.RDD and pyspark.sql.DataFrame APIs and you're starting to build a proficiency in PySpark!

Counting Airplanes in the US Fleet

Lets convert our on_time_dataframe (a DataFrame) into an RDD to calculate the total number of airplanes in the US fleet.


In [34]:
# Dump the unneeded fields
tail_numbers = on_time_dataframe.rdd.map(lambda x: x.TailNum)
tail_numbers = tail_numbers.filter(lambda x: x != '' and x is not None)

# distinct() gets us unique tail numbers
unique_tail_numbers = tail_numbers.distinct()

# now we need a count() of unique tail numbers
airplane_count = unique_tail_numbers.count()
print("Total airplanes: {}".format(airplane_count))


Total airplanes: 4897

Exercise 1: Characterizing Airports

Using the techniques we demonstated above, calculate any 3 out of 4 of the following things using both the SQL and the Dataflow methods for each one. That is: implement each calculation twice - once in SQL and once using Dataflows. Try to use both the RDD and DataFrame APIs as you work.

  1. How many airports are there in the united states?
  2. What is the average flight time for flights arriving in San Francisco (SFO)? What does the distribution of this value look like? Plot a histogram using the create_hist method shown above.
  3. Which American airport has the fastest out-bound speeds? What does the distribution of the flight speeds at this one airport look like? Plot a histogram using the create_hist method shown above.
  4. What were the worst travel dates in terms of overall delayed flights in the US in 2015?

In [35]:
origin_hour_dist = on_time_dataframe.filter(
    on_time_dataframe.AirTime.isNotNull()
).select(
    "Origin", 
    (on_time_dataframe.AirTime/60).alias("Hours"), 
    "Distance"
)
mph_origins = origin_hour_dist.select(
    "Origin", 
    (origin_hour_dist.Distance / origin_hour_dist.Hours).alias("Mph")
)
mph_origins.registerTempTable("mph_origins")

avg_speeds = mph_origins.groupBy("Origin").agg({"Mph": "avg"}).alias("Mph")
avg_speeds.show()


+------+------------------+
|Origin|          avg(Mph)|
+------+------------------+
|   BGM| 346.1853538898441|
|   PSE| 452.6117680000053|
|   INL|286.26851316932107|
|   DLG|395.85893026325215|
|   MSY| 417.8806617127785|
|   PPG| 504.2422916767315|
|   GEG|  402.285589290541|
|   SNA| 417.1687699400066|
|   BUR|381.54113589717645|
|   GRB| 304.8341806425415|
|   GTF| 425.3390308053375|
|   IDA|352.21033421102914|
|   GRR| 345.7402330340281|
|   JLN|333.89806917791054|
|   PSG| 218.6399366568302|
|   EUG|408.36973646170964|
|   PVD|  377.284933612292|
|   GSO| 363.1599876379004|
|   MYR|392.41545245530034|
|   OAK| 411.1720814918298|
+------+------------------+
only showing top 20 rows


In [ ]:
on_time_dataframe.columns

Calculating with DataFrame.groupBy

We can use Spark SQL to calculate things using DataFrames, but we can also group data and calculate as we did with RDDs. For a full list of methods you can apply to grouped DataFrames, see the documentation for pyspark.sql.GroupedData. Below we will demonstrate some of these methods.


In [36]:
# Calculate average of every numeric field
on_time_dataframe.groupBy("Origin").avg().show(1)

# Calculate verage AirTime per origin city
on_time_dataframe.groupBy("Origin").agg({"AirTime": "mean"}).show(1)

# Get the count of flights from each origin
on_time_dataframe.groupBy("Origin").count().show(1)

# Get the maximum airtime for flights leaving each city
on_time_dataframe.groupby("Origin").agg({"AirTime": "max"}).show(1)

# Get the maximum of all numeric columns for flights leaving each city
on_time_dataframe.groupBy("Origin").max().show(1)

# Get the shortest flight for each origin airport
on_time_dataframe.groupBy("Origin").agg({"AirTime": "min"}).show(1)

# Total minutes flown from each airport
on_time_dataframe.groupBy("Origin").agg({"AirTime": "sum"}).show(1)


+------+----------------+--------------------+------------------+---------------+-----------------+--------------------+--------------------+-------------+----------------------+-----------------+------------+-------------+-----------------+-----------------+------------------+--------------------+----------------------+
|Origin|   avg(DepDelay)|avg(DepDelayMinutes)|      avg(TaxiOut)|    avg(TaxiIn)|    avg(ArrDelay)|avg(ArrDelayMinutes)|      avg(Cancelled)|avg(Diverted)|avg(ActualElapsedTime)|     avg(AirTime)|avg(Flights)|avg(Distance)|avg(CarrierDelay)|avg(WeatherDelay)|     avg(NASDelay)|  avg(SecurityDelay)|avg(LateAircraftDelay)|
+------+----------------+--------------------+------------------+---------------+-----------------+--------------------+--------------------+-------------+----------------------+-----------------+------------+-------------+-----------------+-----------------+------------------+--------------------+----------------------+
|   BGM|9.42056074766355|  15.177570093457945|16.193146417445483|6.2398753894081|6.218068535825545|   16.32398753894081|0.009259259259259259|          0.0|      88.4423676012461|66.00934579439253|         1.0|        378.0|58.67307692307692|              0.0|10.711538461538462|0.057692307692307696|    26.903846153846153|
+------+----------------+--------------------+------------------+---------------+-----------------+--------------------+--------------------+-------------+----------------------+-----------------+------------+-------------+-----------------+-----------------+------------------+--------------------+----------------------+
only showing top 1 row

+------+-----------------+
|Origin|     avg(AirTime)|
+------+-----------------+
|   BGM|66.00934579439253|
+------+-----------------+
only showing top 1 row

+------+-----+
|Origin|count|
+------+-----+
|   BGM|  324|
+------+-----+
only showing top 1 row

+------+------------+
|Origin|max(AirTime)|
+------+------------+
|   BGM|        91.0|
+------+------------+
only showing top 1 row

+------+-------------+--------------------+------------+-----------+-------------+--------------------+--------------+-------------+----------------------+------------+------------+-------------+-----------------+-----------------+-------------+------------------+----------------------+
|Origin|max(DepDelay)|max(DepDelayMinutes)|max(TaxiOut)|max(TaxiIn)|max(ArrDelay)|max(ArrDelayMinutes)|max(Cancelled)|max(Diverted)|max(ActualElapsedTime)|max(AirTime)|max(Flights)|max(Distance)|max(CarrierDelay)|max(WeatherDelay)|max(NASDelay)|max(SecurityDelay)|max(LateAircraftDelay)|
+------+-------------+--------------------+------------+-----------+-------------+--------------------+--------------+-------------+----------------------+------------+------------+-------------+-----------------+-----------------+-------------+------------------+----------------------+
|   BGM|        993.0|                 993|       100.0|       30.0|       1005.0|              1005.0|             1|            0|                 163.0|        91.0|           1|        378.0|            993.0|              0.0|         74.0|               3.0|                 364.0|
+------+-------------+--------------------+------------+-----------+-------------+--------------------+--------------+-------------+----------------------+------------+------------+-------------+-----------------+-----------------+-------------+------------------+----------------------+
only showing top 1 row

+------+------------+
|Origin|min(AirTime)|
+------+------------+
|   BGM|        56.0|
+------+------------+
only showing top 1 row

+------+------------+
|Origin|sum(AirTime)|
+------+------------+
|   BGM|     21189.0|
+------+------------+
only showing top 1 row

Pivoting DataFrames

One useful function of DataFrames is pivot. Pivot lets you compute pivot tables from data. Lets use pivot to calculate the average flight times between Atlanta ATL and other airports.


In [37]:
on_time_dataframe\
    .filter("Origin == 'ATL'")\
    .groupBy("Origin")\
    .pivot("Dest")\
    .avg("AirTime")\
    .rdd\
    .map(lambda x: x.asDict())\
    .collect()[0]


Out[37]:
{'Origin': 'ATL',
 'ABE': 96.59897959183674,
 'ABQ': 172.1290684624018,
 'ABY': 29.252365930599368,
 'ACY': 90.23557692307692,
 'AEX': 78.87462235649546,
 'AGS': 29.076743280093496,
 'ALB': 108.65161923454367,
 'ANC': 426.3529411764706,
 'ASE': 198.09574468085106,
 'ATW': 105.11235955056179,
 'AUS': 117.13764946048411,
 'AVL': 33.510327455919395,
 'AVP': 93.94900849858357,
 'AZO': 86.6842105263158,
 'BDL': 109.44785031847134,
 'BHM': 28.848588537211292,
 'BMI': 78.14963119072708,
 'BNA': 38.880419382261266,
 'BOS': 120.84425582313462,
 'BQK': 41.63483735571878,
 'BTR': 70.18819188191883,
 'BTV': 119.43465045592706,
 'BUF': 93.706432748538,
 'BWI': 81.02321908071394,
 'BZN': 220.15625,
 'CAE': 33.79744210162461,
 'CAK': 72.89645776566758,
 'CHA': 24.022727272727273,
 'CHO': 66.9613478691774,
 'CHS': 42.49556255367879,
 'CID': 101.25268817204301,
 'CLE': 77.47449470644851,
 'CLT': 41.581799591002046,
 'CMH': 66.05350605463249,
 'COS': 161.14626865671642,
 'CRW': 57.365990202939116,
 'CSG': 20.638461538461538,
 'CVG': 58.29219530949635,
 'DAB': 54.343106674984405,
 'DAL': 107.51469163706813,
 'DAY': 63.95127118644068,
 'DCA': 77.81844338944038,
 'DEN': 168.22181282240237,
 'DFW': 111.94913462808826,
 'DHN': 33.231617647058826,
 'DSM': 105.68818272095332,
 'DTW': 87.08653664078773,
 'ECP': 41.33196534427724,
 'EGE': 188.8181818181818,
 'ELM': 87.84615384615384,
 'ELP': 171.9362244897959,
 'EVV': 56.99230769230769,
 'EWN': 61.98867313915858,
 'EWR': 101.52093279471171,
 'EYW': 87.044921875,
 'FAR': 151.3846153846154,
 'FAY': 51.07640067911715,
 'FCA': 242.04,
 'FLL': 84.61670182404765,
 'FNT': 89.2876447876448,
 'FSD': 132.47368421052633,
 'FSM': 88.34068965517241,
 'FWA': 75.94344473007712,
 'GJT': None,
 'GNV': 48.145907473309606,
 'GPT': 56.39078947368421,
 'GRB': 107.31177829099308,
 'GRK': 114.30221518987342,
 'GRR': 87.19142857142857,
 'GSO': 49.15724137931034,
 'GSP': 30.053024645257654,
 'GTR': 44.07607607607608,
 'HDN': 191.74736842105264,
 'HNL': 546.5,
 'HOU': 105.17070383912248,
 'HPN': 109.62556165751373,
 'HSV': 31.34819769602378,
 'IAD': 76.36581920903954,
 'IAH': 102.31285988483685,
 'ICT': 111.22064393939394,
 'ILM': 55.823125357756155,
 'IND': 65.04984348663616,
 'JAC': 212.4757281553398,
 'JAN': 54.522334004024145,
 'JAX': 43.31659192825112,
 'JFK': 104.3519768563163,
 'LAN': 90.23076923076923,
 'LAS': 232.89032258064515,
 'LAX': 256.5899419729207,
 'LEX': 48.87252924566357,
 'LFT': 78.40828856485035,
 'LGA': 102.68517445230091,
 'LIT': 69.1775396085741,
 'LNK': 119.29885057471265,
 'MBS': 98.26666666666667,
 'MCI': 103.08773784355179,
 'MCO': 60.877189043556356,
 'MDT': 84.41851494696239,
 'MDW': 88.65857142857143,
 'MEM': 56.11916026020106,
 'MGM': 31.892723492723494,
 'MHT': 119.974930362117,
 'MIA': 85.72141706924316,
 'MKE': 94.42390670553935,
 'MLB': 64.01593901593901,
 'MLI': 90.73167848699764,
 'MLU': 71.84965304548959,
 'MOB': 52.496899545266636,
 'MSN': 98.24233128834356,
 'MSO': 240.26666666666668,
 'MSP': 129.69842810013586,
 'MSY': 67.13189171343677,
 'MTJ': 195.62857142857143,
 'MYR': 47.62226512226512,
 'OAJ': 58.33685064935065,
 'OAK': 295.140625,
 'OKC': 108.98932219127205,
 'OMA': 118.01823361823362,
 'ONT': 252.33333333333334,
 'ORD': 93.76928500916655,
 'ORF': 68.94117647058823,
 'PBI': 80.84571553802323,
 'PDX': 283.55982905982904,
 'PHF': 72.09481481481481,
 'PHL': 93.60806835691274,
 'PHX': 216.42130445731553,
 'PIA': 85.57802874743327,
 'PIT': 72.58607132827858,
 'PNS': 47.029735234215885,
 'PVD': 114.56133464180569,
 'PWM': 126.81884057971014,
 'RAP': 163.13333333333333,
 'RDU': 54.620066109274745,
 'RIC': 65.85651612903226,
 'ROA': 57.06195244055069,
 'ROC': 96.48102466793169,
 'RST': 118.55202312138728,
 'RSW': 74.23462897526501,
 'SAN': 247.43282236248874,
 'SAT': 124.2874558870709,
 'SAV': 37.248983149331785,
 'SBN': 85.35600335852224,
 'SCE': 87.58974358974359,
 'SDF': 53.38678295873868,
 'SEA': 289.20415814587597,
 'SFO': 279.49426976197475,
 'SGF': 87.09448818897638,
 'SHV': 84.34255898366607,
 'SJC': 277.67333333333335,
 'SJU': 184.041015625,
 'SLC': 213.2850971922246,
 'SMF': 272.2068965517241,
 'SNA': 251.88867745004757,
 'SRQ': 65.03754752851711,
 'STL': 75.68986187125358,
 'STT': 189.22590361445782,
 'STX': 196.3783783783784,
 'SYR': 101.62786259541984,
 'TLH': 39.01092179991262,
 'TPA': 60.845393024466425,
 'TRI': 39.92660550458716,
 'TTN': 95.16193181818181,
 'TUL': 97.27774408732566,
 'TUS': 204.35355029585799,
 'TVC': 103.75757575757575,
 'TYS': 30.09027777777778,
 'VLD': 37.780971258671954,
 'VPS': 43.24717691342534,
 'XNA': 89.38354577056779}

Plotting Scatterplots

Another type of visualization that is of interest to data scientists is the scatterplot. A scatterplot enables us to compare the trend of one value plotted against the other. For example, we could calculate the relationship between Origin and Dest Distance and the Mph speed figure we calculated earlier. Are longer flights generally faster, or not?

To prepare a scatterplot, we need to use matplotlib again, so we'll need to look at what its scatterplot API expects. The matplotlib.pyplot.scatter API takes two independant lists of values for the variables x and y, so we must compute them for Distance and Mph.


In [38]:
mph = spark.sql("""
SELECT 
    Distance, 
    ( Distance / ( AirTime/60 ) ) AS Mph 
FROM on_time_performance 
WHERE AirTime IS NOT NULL
""")
mph.show(10)


+--------+------------------+
|Distance|               Mph|
+--------+------------------+
|   432.0| 439.3220338983051|
|   432.0| 336.6233766233766|
|   802.0| 373.0232558139535|
|   731.0|471.61290322580646|
|   769.0| 415.6756756756757|
|   769.0|427.22222222222223|
|  1047.0| 430.2739726027398|
|   731.0| 359.5081967213115|
|   731.0|466.59574468085106|
|   731.0|  481.978021978022|
+--------+------------------+
only showing top 10 rows

Collecting Data

Note that we will have to convert our data from existing within our Spark cluster's memory to within our local computer's memory where matplotlib runs.


In [39]:
distance = mph.select("Distance").rdd.flatMap(lambda x: x)
distance = distance.collect()
distance[0:10]


Out[39]:
[432.0, 432.0, 802.0, 731.0, 769.0, 769.0, 1047.0, 731.0, 731.0, 731.0]

In [40]:
speed = mph.select("Mph").rdd.flatMap(lambda x: x)
speed = speed.collect()
speed[0:10]


Out[40]:
[439.3220338983051,
 336.6233766233766,
 373.0232558139535,
 471.61290322580646,
 415.6756756756757,
 427.22222222222223,
 430.2739726027398,
 359.5081967213115,
 466.59574468085106,
 481.978021978022]

Sampling Data

When I tried to plot this data, it took a very long time to draw. This is because... well, how many unique values are there for each variable? Lets see.


In [41]:
print("Total distances: {:,}".format(len(distance)))
print("Total speeds: {:,}".format(len(speed)))


Total distances: 5,714,008
Total speeds: 5,714,008

It is hard to plot 5.7 million dots on a scatterplot that will fit on a computer screen. So lets sample our data. We can use PySpark DataFrame's sample method. Lets take a 0.1% random sample without replacement, which will leave us with 5,687 or so data points - something we can more easily manage.


In [42]:
sample = mph.sample(False, 0.001)
sample.count()


Out[42]:
5865

Note that we need to sample once and then split the datasets out - otherwise the data for a single observation will be scrambled across variables. We don't want that! All our scatterplots would show no relationships at all.


In [43]:
speed = sample.select("Mph").rdd.flatMap(lambda x: x).collect()
distance = sample.select("Distance").rdd.flatMap(lambda x: x).collect()
print("{:,} x {:,} records!".format(
    len(speed),
    len(distance)
))


5,865 x 5,865 records!

Fun with matplotlib.pyplot.scatter

Now we feed the scatter API distance as x and speed as y, giving it a title and x and y axes. Note that we also specify a size in inches via the figure.figsize rcParam.


In [44]:
%matplotlib inline

import numpy as np
import matplotlib.pyplot as plt

plt.rcParams["figure.figsize"] = (18,12)

plt.scatter(
    distance, 
    speed, 
    alpha=0.5
)
plt.title("Distance x Speed")
plt.xlabel("Distance")
plt.ylabel("Speed")
plt.show()


Interpreting Our Scatterplot

We can see pretty clearly that as distance increases, average speed across that distance increases rapidly and then levels off as the distance increases.

Exercises

  1. Query the on_time_dataframe to focus on two numeric fields.
  2. Plot a histogram of one of these fields
  3. Plot a scatterplot of both of these fields

Predicting Speed Given Distance

It is often the case that once we characterize a distribution, we want to create a function to predict one variable given the other. Lets take this example further by fitting a polynomial regression to describe our data. We use sklearn.pipeline.Pipeline to chain a sklearn.preprocessing.PolynomialFeatures to a sklearn.linear_model.LinearRegression. Other than that, we simply define x and y, and fit a model to those values. Then we finally compute a cross value score, to see the model's performance. We'll see this pattern again when we use large data tools in Spark MLlib.


In [45]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
import numpy as np

x = np.array(distance)
y = np.array(speed)
x_test = np.arange(0, 5000, 100)

model = Pipeline([
    ('poly', PolynomialFeatures(degree=3)),
    ('linear', LinearRegression(fit_intercept=False))
])
model = model.fit(x[:, np.newaxis], y)
model.named_steps['linear'].coef_
y_out = model.predict(x_test.reshape(-1,1))

cross_val_score(model, x.reshape(-1,1), y)


/home/vagrant/anaconda/lib/python3.6/site-packages/sklearn/model_selection/_split.py:1978: FutureWarning: The default value of cv will change from 3 to 5 in version 0.22. Specify it explicitly to silence this warning.
  warnings.warn(CV_WARNING, FutureWarning)
Out[45]:
array([0.62454561, 0.70210257, 0.6544795 ])

Visualizing Polynomial Fit

Because we are running a polynomial regression, where we get to decide the degree of the polynomial. To help decide, lets plot a polynomial fit line to the data using matplotlib.


In [46]:
%matplotlib inline

import numpy as np
import matplotlib.pyplot as plt

plt.rcParams["figure.figsize"] = (18,12)

plt.scatter(
    distance, 
    speed, 
    alpha=0.5
)
plt.plot(
    x_test,
    y_out, 
    color='orange', 
    linewidth=3
)

plt.title("Distance x Speed")
plt.xlabel("Distance")
plt.ylabel("Speed")
plt.show()


Joining Data in PySpark

Next we're going to learn how to join between datasets using PySpark. We're going to pick up from an example we're going to work in chapter 6, and explore it more deeply. To begin with, we will prepare a list of TailNum (tail numbers) from the FAA flight records. These uniquely identify each airplane from each flight.

Unique Tail Numbers


In [47]:
tail_numbers = on_time_dataframe.select("TailNum").distinct()
tail_numbers.show(6)


+-------+
|TailNum|
+-------+
| N396AA|
| N3CWAA|
| N499AA|
| N4YUAA|
| N567AA|
| N442AS|
+-------+
only showing top 6 rows

FAA Airplane Records

We will trim the FAA records down to just the TailNum, Model and Engine_Model. Note that we go ahead and rename the TailNum field to FAATailNum using the pyspark.sql.functions.alias() method. This avoids having two fields referenced by the same name once we perform our joins.


In [48]:
faa_tail_number_inquiry = spark.read.json('../data/faa_tail_number_inquiry.jsonl')
airplane_records = faa_tail_number_inquiry.select(
    faa_tail_number_inquiry.TailNum.alias("FAATailNum"), 
    "Model",
    "Engine_Model"
)
airplane_records.show(6)


+----------+-----------+------------+
|FAATailNum|      Model|Engine_Model|
+----------+-----------+------------+
|    N933EV|CL-600-2B19| CF34 SERIES|
|    N917WN|    737-7H4|  CFM56-7B24|
|    N438WN|    737-7H4|CFM56 SERIES|
|    N283VA|   A320-214| CFM56-5B4/3|
|    N473UA|   A320-232| AL-25SERIES|
|    N637JB|   A320-232|    V2527-A5|
+----------+-----------+------------+
only showing top 6 rows

Inner Joins

You may be familiar with an inner join from SQL. An inner join joins two datasets based on the presence of a key from one dataset in the other. Records which don't have a key that appears in the other table do not appear in the final output.


In [49]:
# INNER JOIN
print(
    "FAA tail numbers: {:,}".format(
        tail_numbers.count()
    )
)
print(
    "Airplane records: {:,}".format(
        airplane_records.count()
    )
)

inner_joined = tail_numbers.join(
    airplane_records, 
    tail_numbers.TailNum == airplane_records.FAATailNum, 
    'inner'
)

print(
    "Joined records:   {:,}".format(
        inner_joined.count()
    )
)


FAA tail numbers: 4,898
Airplane records: 3,988
Joined records:   3,988

Inner Join Results

Note that there are as many records in the output as there were in the FAA Airplane dataset - indicating that there was a representative of every tail number from that dataset in the on-time performance flight records. Lets take a look at the records themselves.


In [50]:
inner_joined.show(6)


+-------+----------+--------------+------------+
|TailNum|FAATailNum|         Model|Engine_Model|
+-------+----------+--------------+------------+
| N396AA|    N396AA|          B300|    PT6A SER|
| N499AA|    N499AA|DC-9-82(MD-82)| JT8D SERIES|
| N442AS|    N442AS|     737-990ER| CFM56-7B27E|
| N369NB|    N369NB|      A319-114|CFM56 SERIES|
| N388DA|    N388DA|       737-832|CFM56 SERIES|
|  N6700|     N6700|       757-232|      PW2037|
+-------+----------+--------------+------------+
only showing top 6 rows

Note how convenient it is that we renamed one of the keys FAATailNum. If we hadn't, we'd have two columns with the same name now and would have trouble referring to one or the other.

Left Outer Join

Another type of join is the left outer join. It ensures that one record will remain in the output from the left side of the join no matter what. If a match on the join keys is found, the fields for the record on the right will be filled. If a match is not found, they will be empty.

Lets look at how this works with our two datasets.


In [51]:
# INNER JOIN
print(
    "FAA tail numbers: {:,}".format(
        tail_numbers.count()
    )
)
print(
    "Airplane records: {:,}".format(
        airplane_records.count()
    )
)

left_outer_joined = tail_numbers.join(
    airplane_records, 
    tail_numbers.TailNum == airplane_records.FAATailNum, 
    'left_outer'
)

print(
    "Joined records:   {:,}".format(
        left_outer_joined.count()
    )
)


FAA tail numbers: 4,898
Airplane records: 3,988
Joined records:   4,898

Left Outer Join Result

Note that there were 4,898 records on the left side of our join and there are the same number on the output of our join. Lets take a look at what both matched and unmatched records look like:


In [52]:
left_outer_joined.show(6)


+-------+----------+--------------+------------+
|TailNum|FAATailNum|         Model|Engine_Model|
+-------+----------+--------------+------------+
| N396AA|    N396AA|          B300|    PT6A SER|
| N3CWAA|      null|          null|        null|
| N499AA|    N499AA|DC-9-82(MD-82)| JT8D SERIES|
| N4YUAA|      null|          null|        null|
| N567AA|      null|          null|        null|
| N442AS|    N442AS|     737-990ER| CFM56-7B27E|
+-------+----------+--------------+------------+
only showing top 6 rows

Note that some records have fields filled out, and some don't.

Right Outer Join

Another type of join is the right outer join. This works the opposite of a left outer join. In this case, the output will preserve a record for each and every record on the right side of the join. Use the right_outer join type to perform this kind of join.

Exercises

  1. Go back and perform a right outer join on the preceding two datasets. Is the distinct() call on the FAA on-time performance records still needed? Why or why not?

Using RDDs and Map/Reduce to Prepare a Complex Record


In [53]:
# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(
    lambda x: (x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
)

# Group flights by tail number, sorted by date, then flight number, then origin/dest
flights_per_airplane = flights\
  .map(lambda nameTuple: (nameTuple[5], [nameTuple[0:5]]))\
  .reduceByKey(lambda a, b: a + b)\
  .map(lambda tuple:
      {
        'TailNum': tuple[0], 
        'Flights': sorted(tuple[1], key=lambda x: (x[1], x[2], x[3], x[4]))
      }
    )
flights_per_airplane.first()


Out[53]:
{'TailNum': 'N382DA',
 'Flights': [('DL', '2015-01-02', '1664', 'MSP', 'DCA'),
  ('DL', '2015-01-02', '1968', 'LAX', 'MSP'),
  ('DL', '2015-01-03', '1303', 'SLC', 'BOS'),
  ('DL', '2015-01-03', '1374', 'OAK', 'SLC'),
  ('DL', '2015-01-03', '1374', 'SLC', 'OAK'),
  ('DL', '2015-01-03', '1944', 'DCA', 'SLC'),
  ('DL', '2015-01-03', '2071', 'SLC', 'PHX'),
  ('DL', '2015-01-03', '966', 'PHX', 'SLC'),
  ('DL', '2015-01-04', '1207', 'DTW', 'SLC'),
  ('DL', '2015-01-04', '2079', 'BOS', 'DTW'),
  ('DL', '2015-01-05', '2324', 'SLC', 'ATL'),
  ('DL', '2015-01-05', '2352', 'ATL', 'JFK'),
  ('DL', '2015-01-06', '332', 'SJU', 'JFK'),
  ('DL', '2015-01-06', '425', 'JFK', 'SJU'),
  ('DL', '2015-01-08', '421', 'JFK', 'ATL'),
  ('DL', '2015-01-10', '2264', 'LAX', 'MEM'),
  ('DL', '2015-01-11', '2020', 'MEM', 'LAX'),
  ('DL', '2015-01-11', '2534', 'LAX', 'BOS'),
  ('DL', '2015-01-11', '2549', 'BOS', 'MSP'),
  ('DL', '2015-01-12', '2193', 'MSP', 'SEA'),
  ('DL', '2015-01-12', '2424', 'SEA', 'DTW'),
  ('DL', '2015-01-12', '921', 'MKE', 'MSP'),
  ('DL', '2015-01-12', '921', 'MSP', 'MKE'),
  ('DL', '2015-01-13', '1511', 'DTW', 'DEN'),
  ('DL', '2015-01-13', '410', 'DEN', 'JFK'),
  ('DL', '2015-01-13', '433', 'JFK', 'DEN'),
  ('DL', '2015-01-14', '1610', 'DEN', 'DTW'),
  ('DL', '2015-01-14', '2335', 'DTW', 'SLC'),
  ('DL', '2015-01-14', '952', 'SLC', 'DFW'),
  ('DL', '2015-01-15', '1657', 'SLC', 'LAS'),
  ('DL', '2015-01-15', '2056', 'DFW', 'SLC'),
  ('DL', '2015-01-15', '466', 'SLC', 'JFK'),
  ('DL', '2015-01-15', '490', 'JFK', 'SLC'),
  ('DL', '2015-01-16', '1552', 'LAS', 'LAX'),
  ('DL', '2015-01-16', '1577', 'LAX', 'MSY'),
  ('DL', '2015-01-16', '1603', 'MSY', 'MSP'),
  ('DL', '2015-01-16', '2336', 'MSP', 'MKE'),
  ('DL', '2015-01-17', '1071', 'MKE', 'MSP'),
  ('DL', '2015-01-18', '2320', 'JFK', 'SJU'),
  ('DL', '2015-01-18', '2576', 'JFK', 'SJU'),
  ('DL', '2015-01-18', '42', 'SJU', 'JFK'),
  ('DL', '2015-01-19', '326', 'SJU', 'JFK'),
  ('DL', '2015-01-19', '332', 'SJU', 'JFK'),
  ('DL', '2015-01-19', '425', 'JFK', 'SJU'),
  ('DL', '2015-01-20', '2658', 'LAS', 'JFK'),
  ('DL', '2015-01-20', '480', 'JFK', 'LAS'),
  ('DL', '2015-01-22', '1728', 'LAS', 'JFK'),
  ('DL', '2015-01-22', '1883', 'SLC', 'LAS'),
  ('DL', '2015-01-22', '436', 'JFK', 'SLC'),
  ('DL', '2015-01-23', '2623', 'JFK', 'SAN'),
  ('DL', '2015-01-24', '2066', 'SLC', 'MCO'),
  ('DL', '2015-01-24', '978', 'SAN', 'SLC'),
  ('DL', '2015-01-25', '1574', 'MCO', 'MSP'),
  ('DL', '2015-01-25', '2193', 'MSP', 'SEA'),
  ('DL', '2015-01-25', '2424', 'SEA', 'DTW'),
  ('DL', '2015-01-26', '1511', 'DTW', 'DEN'),
  ('DL', '2015-01-26', '2160', 'JFK', 'DEN'),
  ('DL', '2015-01-26', '410', 'DEN', 'JFK'),
  ('DL', '2015-01-27', '1557', 'SLC', 'SEA'),
  ('DL', '2015-01-27', '1610', 'DEN', 'DTW'),
  ('DL', '2015-01-27', '2335', 'DTW', 'SLC'),
  ('DL', '2015-01-28', '2340', 'SEA', 'DTW'),
  ('DL', '2015-01-28', '582', 'DTW', 'BOS'),
  ('DL', '2015-01-29', '1168', 'LAX', 'MIA'),
  ('DL', '2015-01-29', '1325', 'LAX', 'MSY'),
  ('DL', '2015-01-29', '1577', 'MSY', 'LAX'),
  ('DL', '2015-01-29', '2531', 'BOS', 'LAX'),
  ('DL', '2015-01-30', '1169', 'MIA', 'LAX'),
  ('DL', '2015-01-30', '1566', 'LAX', 'RDU'),
  ('DL', '2015-02-01', '1097', 'RDU', 'LAX'),
  ('DL', '2015-02-01', '1566', 'LAX', 'RDU'),
  ('DL', '2015-02-02', '1097', 'RDU', 'LAX'),
  ('DL', '2015-02-02', '1882', 'SLC', 'DTW'),
  ('DL', '2015-02-02', '2170', 'LAX', 'SLC'),
  ('DL', '2015-02-03', '1542', 'SEA', 'JFK'),
  ('DL', '2015-02-03', '2188', 'ANC', 'SEA'),
  ('DL', '2015-02-03', '2436', 'DTW', 'SEA'),
  ('DL', '2015-02-03', '2436', 'SEA', 'ANC'),
  ('DL', '2015-02-06', '2658', 'LAS', 'JFK'),
  ('DL', '2015-02-06', '420', 'JFK', 'LAS'),
  ('DL', '2015-02-08', '1740', 'LAS', 'SLC'),
  ('DL', '2015-02-08', '2360', 'SLC', 'DCA'),
  ('DL', '2015-02-08', '404', 'JFK', 'LAS'),
  ('DL', '2015-02-09', '872', 'DCA', 'ATL'),
  ('DL', '2015-02-10', '776', 'ATL', 'SRQ'),
  ('DL', '2015-02-11', '1684', 'ATL', 'SJC'),
  ('DL', '2015-02-11', '1684', 'SJC', 'ATL'),
  ('DL', '2015-02-11', '2561', 'SRQ', 'ATL'),
  ('DL', '2015-02-12', '1123', 'ATL', 'SLC'),
  ('DL', '2015-02-12', '1830', 'SLC', 'JFK'),
  ('DL', '2015-02-12', '2457', 'ATL', 'BDL'),
  ('DL', '2015-02-12', '2457', 'BDL', 'ATL'),
  ('DL', '2015-02-14', '482', 'JFK', 'LAS'),
  ('DL', '2015-02-15', '1795', 'LAS', 'JFK'),
  ('DL', '2015-02-16', '1552', 'LAS', 'LAX'),
  ('DL', '2015-02-16', '2215', 'MSP', 'FSD'),
  ('DL', '2015-02-17', '896', 'FSD', 'MSP'),
  ('DL', '2015-02-18', '1760', 'SEA', 'ANC'),
  ('DL', '2015-02-18', '2253', 'MSP', 'SEA'),
  ('DL', '2015-02-19', '142', 'ANC', 'SEA'),
  ('DL', '2015-02-19', '1473', 'SEA', 'JFK'),
  ('DL', '2015-02-19', '492', 'JFK', 'SJU'),
  ('DL', '2015-02-20', '326', 'SJU', 'JFK'),
  ('DL', '2015-02-20', '436', 'JFK', 'LAS'),
  ('DL', '2015-02-21', '1795', 'LAS', 'JFK'),
  ('DL', '2015-02-21', '348', 'SJU', 'JFK'),
  ('DL', '2015-02-21', '409', 'JFK', 'SJU'),
  ('DL', '2015-02-22', '332', 'SJU', 'JFK'),
  ('DL', '2015-02-22', '465', 'JFK', 'SJU'),
  ('DL', '2015-02-25', '2048', 'PDX', 'JFK'),
  ('DL', '2015-02-25', '498', 'JFK', 'PDX'),
  ('DL', '2015-02-26', '1684', 'ATL', 'SJC'),
  ('DL', '2015-02-26', '1684', 'SJC', 'ATL'),
  ('DL', '2015-02-27', '503', 'ATL', 'PDX'),
  ('DL', '2015-02-27', '503', 'PDX', 'ATL'),
  ('DL', '2015-02-28', '1709', 'ATL', 'BNA'),
  ('DL', '2015-03-01', '1341', 'ATL', 'GSP'),
  ('DL', '2015-03-01', '1341', 'GSP', 'ATL'),
  ('DL', '2015-03-01', '2357', 'JFK', 'ATL'),
  ('DL', '2015-03-01', '2589', 'BNA', 'ATL'),
  ('DL', '2015-03-01', '478', 'ATL', 'JFK'),
  ('DL', '2015-03-02', '1684', 'ATL', 'SJC'),
  ('DL', '2015-03-02', '1684', 'SJC', 'ATL'),
  ('DL', '2015-03-02', '2172', 'ATL', 'TPA'),
  ('DL', '2015-03-02', '2172', 'TPA', 'ATL'),
  ('DL', '2015-03-03', '1668', 'LAX', 'MSP'),
  ('DL', '2015-03-03', '2203', 'ATL', 'LAS'),
  ('DL', '2015-03-03', '2215', 'MSP', 'FSD'),
  ('DL', '2015-03-03', '2565', 'LAS', 'SLC'),
  ('DL', '2015-03-03', '637', 'SLC', 'LAX'),
  ('DL', '2015-03-04', '896', 'FSD', 'MSP'),
  ('DL', '2015-03-05', '1287', 'DTW', 'BNA'),
  ('DL', '2015-03-05', '1959', 'DTW', 'TPA'),
  ('DL', '2015-03-05', '1959', 'TPA', 'DTW'),
  ('DL', '2015-03-05', '2268', 'LAX', 'SLC'),
  ('DL', '2015-03-05', '2572', 'BNA', 'LAX'),
  ('DL', '2015-03-05', '971', 'SLC', 'MCO'),
  ('DL', '2015-03-06', '1683', 'SLC', 'LAS'),
  ('DL', '2015-03-06', '2540', 'LAS', 'JFK'),
  ('DL', '2015-03-06', '2610', 'MCO', 'SLC'),
  ('DL', '2015-03-07', '1949', 'ATL', 'MIA'),
  ('DL', '2015-03-08', '503', 'ATL', 'PDX'),
  ('DL', '2015-03-08', '700', 'PDX', 'ATL'),
  ('DL', '2015-03-08', '891', 'MIA', 'ATL'),
  ('DL', '2015-03-09', '1755', 'ATL', 'LAS'),
  ('DL', '2015-03-09', '2554', 'LAS', 'JFK'),
  ('DL', '2015-03-10', '1979', 'LAS', 'DTW'),
  ('DL', '2015-03-10', '2474', 'JFK', 'SJU'),
  ('DL', '2015-03-10', '409', 'JFK', 'LAS'),
  ('DL', '2015-03-10', '42', 'SJU', 'JFK'),
  ('DL', '2015-03-11', '1338', 'ATL', 'BNA'),
  ('DL', '2015-03-11', '1640', 'DTW', 'ATL'),
  ('DL', '2015-03-12', '1324', 'BNA', 'ATL'),
  ('DL', '2015-03-13', '1684', 'ATL', 'SJC'),
  ('DL', '2015-03-13', '1684', 'SJC', 'ATL'),
  ('DL', '2015-03-14', '1268', 'ATL', 'LAS'),
  ('DL', '2015-03-14', '1723', 'SLC', 'BIL'),
  ('DL', '2015-03-14', '1740', 'LAS', 'SLC'),
  ('DL', '2015-03-15', '1635', 'BIL', 'SLC'),
  ('DL', '2015-03-15', '1684', 'ATL', 'SJC'),
  ('DL', '2015-03-15', '1684', 'SJC', 'ATL'),
  ('DL', '2015-03-15', '1857', 'SLC', 'SEA'),
  ('DL', '2015-03-15', '2485', 'SEA', 'ATL'),
  ('DL', '2015-03-16', '1837', 'ATL', 'AUS'),
  ('DL', '2015-03-17', '1555', 'ATL', 'LAX'),
  ('DL', '2015-03-17', '1557', 'SLC', 'SEA'),
  ('DL', '2015-03-17', '1820', 'LAX', 'SEA'),
  ('DL', '2015-03-17', '1984', 'AUS', 'ATL'),
  ('DL', '2015-03-17', '2150', 'SEA', 'SLC'),
  ('DL', '2015-03-18', '1560', 'SEA', 'DTW'),
  ('DL', '2015-03-19', '1055', 'LAX', 'CMH'),
  ('DL', '2015-03-19', '2020', 'LAX', 'MEM'),
  ('DL', '2015-03-19', '2020', 'MEM', 'LAX'),
  ('DL', '2015-03-20', '1327', 'CMH', 'LAX'),
  ('DL', '2015-03-20', '2592', 'LAX', 'MSY'),
  ('DL', '2015-03-21', '1325', 'MSY', 'LAX'),
  ('DL', '2015-03-21', '2534', 'LAX', 'BOS'),
  ('DL', '2015-03-22', '1659', 'BOS', 'LAX'),
  ('DL', '2015-03-22', '729', 'LAX', 'MSY'),
  ('DL', '2015-03-22', '729', 'MSY', 'LAX'),
  ('DL', '2015-03-22', '877', 'LAX', 'IND'),
  ('DL', '2015-03-23', '1550', 'IND', 'LAX'),
  ('DL', '2015-03-23', '2592', 'LAX', 'MSY'),
  ('DL', '2015-03-24', '1325', 'MSY', 'LAX'),
  ('DL', '2015-03-24', '2341', 'MSP', 'OMA'),
  ('DL', '2015-03-24', '2534', 'LAX', 'BOS'),
  ('DL', '2015-03-24', '2549', 'BOS', 'MSP'),
  ('DL', '2015-03-25', '1620', 'OMA', 'MSP'),
  ('DL', '2015-03-25', '2595', 'DTW', 'RDU'),
  ('DL', '2015-03-26', '1097', 'RDU', 'LAX'),
  ('DL', '2015-03-26', '1979', 'LAX', 'RDU'),
  ('DL', '2015-03-27', '1055', 'LAX', 'CMH'),
  ('DL', '2015-03-27', '1137', 'RDU', 'ATL'),
  ('DL', '2015-03-27', '1545', 'SEA', 'LAX'),
  ('DL', '2015-03-27', '1555', 'ATL', 'LAX'),
  ('DL', '2015-03-27', '1820', 'LAX', 'SEA'),
  ('DL', '2015-03-28', '1327', 'CMH', 'LAX'),
  ('DL', '2015-03-28', '2592', 'LAX', 'MSY'),
  ('DL', '2015-03-29', '1325', 'MSY', 'LAX'),
  ('DL', '2015-03-29', '2341', 'MSP', 'OMA'),
  ('DL', '2015-03-29', '2534', 'LAX', 'BOS'),
  ('DL', '2015-03-29', '2549', 'BOS', 'MSP'),
  ('DL', '2015-03-30', '1620', 'OMA', 'MSP'),
  ('DL', '2015-03-31', '1640', 'DTW', 'ATL'),
  ('DL', '2015-04-01', '1949', 'ATL', 'MIA'),
  ('DL', '2015-04-02', '891', 'MIA', 'ATL'),
  ('DL', '2015-04-03', '420', 'JFK', 'DEN'),
  ('DL', '2015-04-04', '1610', 'DEN', 'DTW'),
  ('DL', '2015-04-05', '1683', 'SLC', 'LAS'),
  ('DL', '2015-04-05', '1740', 'LAS', 'SLC'),
  ('DL', '2015-04-05', '1809', 'DTW', 'SLC'),
  ('DL', '2015-04-05', '2360', 'SLC', 'DCA'),
  ('DL', '2015-04-06', '1264', 'SLC', 'JFK'),
  ('DL', '2015-04-06', '2332', 'LAX', 'SLC'),
  ('DL', '2015-04-06', '2359', 'DCA', 'SLC'),
  ('DL', '2015-04-06', '2378', 'SLC', 'SAN'),
  ('DL', '2015-04-06', '637', 'SLC', 'LAX'),
  ('DL', '2015-04-06', '89', 'SAN', 'SLC'),
  ('DL', '2015-04-07', '2643', 'LAX', 'LAS'),
  ('DL', '2015-04-07', '498', 'JFK', 'ATL'),
  ('DL', '2015-04-08', '2374', 'LAS', 'ATL'),
  ('DL', '2015-04-08', '889', 'ATL', 'RDU'),
  ('DL', '2015-04-09', '1736', 'ATL', 'CMH'),
  ('DL', '2015-04-09', '2250', 'RDU', 'ATL'),
  ('DL', '2015-04-10', '1783', 'SLC', 'LAS'),
  ('DL', '2015-04-10', '2214', 'LAS', 'ATL'),
  ('DL', '2015-04-10', '2419', 'CMH', 'ATL'),
  ('DL', '2015-04-10', '983', 'ATL', 'SLC'),
  ('DL', '2015-04-11', '2559', 'ATL', 'PHX'),
  ('DL', '2015-04-11', '779', 'PHX', 'ATL'),
  ('DL', '2015-04-12', '796', 'ATL', 'IND'),
  ('DL', '2015-04-12', '796', 'IND', 'ATL'),
  ('DL', '2015-04-12', '881', 'ATL', 'AUS'),
  ('DL', '2015-04-12', '881', 'AUS', 'ATL'),
  ('DL', '2015-04-13', '1077', 'ATL', 'RDU'),
  ('DL', '2015-04-13', '1077', 'RDU', 'ATL'),
  ('DL', '2015-04-13', '753', 'ATL', 'DTW'),
  ('DL', '2015-04-14', '188', 'DTW', 'BOS'),
  ('DL', '2015-04-14', '2051', 'DTW', 'RSW'),
  ('DL', '2015-04-14', '2051', 'RSW', 'DTW'),
  ('DL', '2015-04-14', '2549', 'BOS', 'MSP'),
  ('DL', '2015-04-14', '653', 'MSP', 'LAX'),
  ('DL', '2015-04-15', '1109', 'SEA', 'JFK'),
  ('DL', '2015-04-15', '1352', 'LAX', 'SEA'),
  ('DL', '2015-04-15', '2020', 'MEM', 'LAX'),
  ('DL', '2015-04-15', '2264', 'LAX', 'MEM'),
  ('DL', '2015-04-16', '2474', 'JFK', 'SJU'),
  ('DL', '2015-04-16', '2634', 'JFK', 'DEN'),
  ('DL', '2015-04-16', '499', 'SJU', 'JFK'),
  ('DL', '2015-04-17', '1610', 'DEN', 'DTW'),
  ('DL', '2015-04-17', '1847', 'SEA', 'ANC'),
  ('DL', '2015-04-17', '281', 'DTW', 'SEA'),
  ('DL', '2015-04-18', '822', 'ANC', 'SEA'),
  ('DL', '2015-04-18', '822', 'SEA', 'DTW'),
  ('DL', '2015-04-19', '1511', 'DTW', 'DEN'),
  ('DL', '2015-04-19', '1934', 'SLC', 'BIL'),
  ('DL', '2015-04-19', '2182', 'DEN', 'JFK'),
  ('DL', '2015-04-19', '470', 'JFK', 'SLC'),
  ('DL', '2015-04-20', '1471', 'DTW', 'AUS'),
  ('DL', '2015-04-20', '1912', 'BIL', 'SLC'),
  ('DL', '2015-04-20', '2312', 'JFK', 'DTW'),
  ('DL', '2015-04-20', '466', 'SLC', 'JFK'),
  ('DL', '2015-04-21', '1984', 'AUS', 'ATL'),
  ('DL', '2015-04-22', '439', 'JFK', 'SAN'),
  ('DL', '2015-04-23', '1211', 'SLC', 'LAX'),
  ('DL', '2015-04-23', '1936', 'LAX', 'CVG'),
  ('DL', '2015-04-23', '2404', 'SAN', 'JFK'),
  ('DL', '2015-04-23', '460', 'JFK', 'SLC'),
  ('DL', '2015-04-24', '2315', 'CVG', 'ATL'),
  ('DL', '2015-04-24', '421', 'JFK', 'AUS'),
  ('DL', '2015-04-25', '1972', 'AUS', 'JFK'),
  ('DL', '2015-04-26', '1830', 'SLC', 'JFK'),
  ('DL', '2015-04-26', '492', 'JFK', 'SLC'),
  ('DL', '2015-04-26', '622', 'PDX', 'SLC'),
  ('DL', '2015-04-26', '976', 'SLC', 'PDX'),
  ('DL', '2015-04-28', '1077', 'ATL', 'RDU'),
  ('DL', '2015-04-28', '1077', 'RDU', 'ATL'),
  ('DL', '2015-04-29', '1165', 'ATL', 'CHS'),
  ('DL', '2015-04-29', '1165', 'CHS', 'ATL'),
  ('DL', '2015-04-29', '1684', 'ATL', 'SJC'),
  ('DL', '2015-04-29', '1684', 'SJC', 'ATL'),
  ('DL', '2015-04-30', '421', 'JFK', 'AUS'),
  ('DL', '2015-05-01', '1302', 'ATL', 'SDF'),
  ('DL', '2015-05-01', '1302', 'SDF', 'ATL'),
  ('DL', '2015-05-01', '1713', 'ATL', 'RSW'),
  ('DL', '2015-05-01', '1984', 'AUS', 'ATL'),
  ('DL', '2015-05-01', '2524', 'ATL', 'VPS'),
  ('DL', '2015-05-01', '2524', 'VPS', 'ATL'),
  ('DL', '2015-05-02', '2199', 'RSW', 'DTW'),
  ('DL', '2015-05-03', '1576', 'LAX', 'TPA'),
  ('DL', '2015-05-03', '159', 'BOS', 'DTW'),
  ('DL', '2015-05-03', '1621', 'DTW', 'TPA'),
  ('DL', '2015-05-03', '2052', 'TPA', 'LAX'),
  ('DL', '2015-05-03', '2523', 'DTW', 'BOS'),
  ('DL', '2015-05-04', '1061', 'LAX', 'MCO'),
  ('DL', '2015-05-04', '1061', 'MCO', 'LAX'),
  ('DL', '2015-05-04', '1168', 'LAX', 'MIA'),
  ('DL', '2015-05-04', '1819', 'TPA', 'LAX'),
  ('DL', '2015-05-05', '1169', 'MIA', 'LAX'),
  ('DL', '2015-05-05', '2345', 'LAX', 'CMH'),
  ('DL', '2015-05-06', '2002', 'ATL', 'LAS'),
  ('DL', '2015-05-06', '2583', 'CMH', 'ATL'),
  ('DL', '2015-05-06', '766', 'LAS', 'DTW'),
  ('DL', '2015-05-06', '891', 'DTW', 'LAS'),
  ('DL', '2015-05-07', '2059', 'JFK', 'LAS'),
  ('DL', '2015-05-07', '2155', 'LAS', 'JFK'),
  ('DL', '2015-05-08', '1167', 'SLC', 'BZN'),
  ('DL', '2015-05-08', '2140', 'LAS', 'SLC'),
  ('DL', '2015-05-08', '2155', 'LAS', 'JFK'),
  ('DL', '2015-05-08', '443', 'JFK', 'LAS'),
  ('DL', '2015-05-09', '1467', 'LAS', 'DTW'),
  ('DL', '2015-05-09', '2526', 'BZN', 'SLC'),
  ('DL', '2015-05-09', '437', 'JFK', 'LAS'),
  ('DL', '2015-05-09', '466', 'SLC', 'JFK'),
  ('DL', '2015-05-10', '198', 'SEA', 'LAX'),
  ('DL', '2015-05-10', '2592', 'LAX', 'MSY'),
  ('DL', '2015-05-10', '281', 'DTW', 'SEA'),
  ('DL', '2015-05-11', '1325', 'MSY', 'LAX'),
  ('DL', '2015-05-11', '2215', 'MSP', 'FSD'),
  ('DL', '2015-05-11', '2534', 'LAX', 'BOS'),
  ('DL', '2015-05-11', '2549', 'BOS', 'MSP'),
  ('DL', '2015-05-12', '1957', 'SLC', 'SEA'),
  ('DL', '2015-05-12', '2004', 'LAX', 'SLC'),
  ('DL', '2015-05-12', '2092', 'MSP', 'MSY'),
  ('DL', '2015-05-12', '896', 'FSD', 'MSP'),
  ('DL', '2015-05-12', '984', 'MSY', 'LAX'),
  ('DL', '2015-05-13', '1168', 'LAX', 'MIA'),
  ('DL', '2015-05-13', '129', 'SLC', 'SEA'),
  ('DL', '2015-05-13', '1634', 'SEA', 'SLC'),
  ('DL', '2015-05-13', '1877', 'SEA', 'LAX'),
  ('DL', '2015-05-13', '2122', 'LAX', 'SLC'),
  ('DL', '2015-05-13', '2170', 'SLC', 'LAX'),
  ('DL', '2015-05-14', '1169', 'MIA', 'LAX'),
  ('DL', '2015-05-14', '2345', 'LAX', 'CMH'),
  ('DL', '2015-05-15', '2583', 'CMH', 'ATL'),
  ('DL', '2015-05-16', '1557', 'SLC', 'SEA'),
  ('DL', '2015-05-16', '1755', 'ATL', 'LAX'),
  ('DL', '2015-05-16', '2004', 'LAX', 'SLC'),
  ('DL', '2015-05-17', '1358', 'SEA', 'LAX'),
  ('DL', '2015-05-18', '1063', 'ATL', 'BHM'),
  ('DL', '2015-05-18', '1423', 'ATL', 'STL'),
  ('DL', '2015-05-18', '1423', 'STL', 'ATL'),
  ('DL', '2015-05-18', '1800', 'ATL', 'BOS'),
  ('DL', '2015-05-18', '2482', 'BOS', 'ATL'),
  ('DL', '2015-05-19', '2002', 'ATL', 'LAS'),
  ('DL', '2015-05-19', '667', 'BHM', 'ATL'),
  ('DL', '2015-05-19', '766', 'LAS', 'DTW'),
  ('DL', '2015-05-20', '1621', 'DTW', 'TPA'),
  ('DL', '2015-05-21', '442', 'JFK', 'AUS'),
  ('DL', '2015-05-22', '1835', 'ATL', 'SLC'),
  ('DL', '2015-05-22', '1984', 'AUS', 'ATL'),
  ('DL', '2015-05-23', '1417', 'ATL', 'CVG'),
  ('DL', '2015-05-23', '2324', 'SLC', 'ATL'),
  ('DL', '2015-05-24', '1581', 'LAX', 'CVG'),
  ('DL', '2015-05-24', '1933', 'CVG', 'LAX'),
  ('DL', '2015-05-25', '1405', 'LAX', 'MEM'),
  ('DL', '2015-05-25', '1502', 'CVG', 'TPA'),
  ('DL', '2015-05-25', '1502', 'TPA', 'CVG'),
  ('DL', '2015-05-25', '1833', 'CVG', 'LAX'),
  ('DL', '2015-05-26', '1806', 'LAX', 'DTW'),
  ('DL', '2015-05-26', '2020', 'MEM', 'LAX'),
  ('DL', '2015-05-26', '2335', 'DTW', 'DEN'),
  ('DL', '2015-05-27', '1610', 'DEN', 'DTW'),
  ('DL', '2015-05-27', '1728', 'LAS', 'JFK'),
  ('DL', '2015-05-27', '343', 'DTW', 'LAS'),
  ('DL', '2015-05-28', '2217', 'JFK', 'PHX'),
  ('DL', '2015-05-28', '2405', 'PHX', 'JFK'),
  ('DL', '2015-05-28', '2435', 'JFK', 'PHX'),
  ('DL', '2015-05-28', '2492', 'PHX', 'DTW'),
  ('DL', '2015-05-29', '1167', 'SLC', 'BZN'),
  ('DL', '2015-05-29', '1461', 'ORD', 'SLC'),
  ('DL', '2015-05-29', '1461', 'SLC', 'ORD'),
  ('DL', '2015-05-29', '2034', 'LAS', 'SLC'),
  ('DL', '2015-05-29', '2316', 'DTW', 'LAS'),
  ('DL', '2015-05-30', '1327', 'PHX', 'ATL'),
  ('DL', '2015-05-30', '2526', 'BZN', 'SLC'),
  ('DL', '2015-05-30', '2569', 'JFK', 'PHX'),
  ('DL', '2015-05-30', '466', 'SLC', 'JFK'),
  ('DL', '2015-05-31', '1237', 'DTW', 'BOS'),
  ('DL', '2015-05-31', '2002', 'ATL', 'LAS'),
  ('DL', '2015-05-31', '766', 'LAS', 'DTW'),
  ('DL', '2015-06-01', '1217', 'DTW', 'LAS'),
  ('DL', '2015-06-01', '866', 'BOS', 'DTW'),
  ('DL', '2015-06-02', '2155', 'LAS', 'JFK'),
  ('DL', '2015-06-03', '443', 'JFK', 'LAS'),
  ('DL', '2015-06-04', '1602', 'LAS', 'ATL'),
  ('DL', '2015-06-06', '1693', 'LAX', 'TPA'),
  ('DL', '2015-06-06', '2849', 'BNA', 'LAX'),
  ('DL', '2015-06-06', '2849', 'LAX', 'BNA'),
  ('DL', '2015-06-07', '2137', 'TPA', 'ATL'),
  ('DL', '2015-06-07', '843', 'ATL', 'JFK'),
  ('DL', '2015-06-08', '458', 'JFK', 'AUS'),
  ('DL', '2015-06-09', '1972', 'AUS', 'JFK'),
  ('DL', '2015-06-10', '2835', 'JFK', 'SAT'),
  ('DL', '2015-06-10', '2836', 'JFK', 'AUS'),
  ('DL', '2015-06-10', '401', 'AUS', 'JFK'),
  ('DL', '2015-06-11', '1532', 'SAT', 'JFK'),
  ('DL', '2015-06-11', '1901', 'MIA', 'LAX'),
  ('DL', '2015-06-11', '495', 'JFK', 'MIA'),
  ('DL', '2015-06-12', '1432', 'LAX', 'SEA'),
  ('DL', '2015-06-12', '2820', 'SEA', 'LAX'),
  ('DL', '2015-06-12', '530', 'ANC', 'SEA'),
  ('DL', '2015-06-12', '530', 'SEA', 'ANC'),
  ('DL', '2015-06-13', '2188', 'ATL', 'SEA'),
  ('DL', '2015-06-13', '2188', 'SEA', 'JNU'),
  ('DL', '2015-06-14', '1781', 'JNU', 'SEA'),
  ('DL', '2015-06-14', '2179', 'SLC', 'SMF'),
  ('DL', '2015-06-14', '707', 'ORD', 'SLC'),
  ('DL', '2015-06-14', '707', 'SLC', 'ORD'),
  ('DL', '2015-06-14', '786', 'SEA', 'SLC'),
  ('DL', '2015-06-15', '1536', 'BWI', 'DTW'),
  ('DL', '2015-06-15', '1536', 'DTW', 'BWI'),
  ('DL', '2015-06-15', '188', 'DTW', 'BOS'),
  ('DL', '2015-06-15', '1936', 'SMF', 'MSP'),
  ('DL', '2015-06-15', '2264', 'LAX', 'MEM'),
  ('DL', '2015-06-15', '2532', 'BOS', 'LAX'),
  ('DL', '2015-06-15', '557', 'MSP', 'DTW'),
  ('DL', '2015-06-16', '1543', 'MSY', 'MSP'),
  ('DL', '2015-06-16', '1577', 'LAX', 'MSY'),
  ('DL', '2015-06-16', '524', 'MEM', 'LAX'),
  ('DL', '2015-06-17', '2051', 'MSP', 'LAS'),
  ('DL', '2015-06-17', '2812', 'LAS', 'SLC'),
  ('DL', '2015-06-17', '779', 'SLC', 'LAS'),
  ('DL', '2015-06-18', '1602', 'LAS', 'ATL'),
  ('DL', '2015-06-18', '2084', 'ATL', 'STL'),
  ('DL', '2015-06-18', '2084', 'STL', 'ATL'),
  ('DL', '2015-06-19', '2239', 'ATL', 'SLC'),
  ('DL', '2015-06-19', '835', 'SLC', 'ATL'),
  ('DL', '2015-06-21', '198', 'SEA', 'LAX'),
  ('DL', '2015-06-21', '2592', 'LAX', 'MSY'),
  ('DL', '2015-06-21', '759', 'LAX', 'SEA'),
  ('DL', '2015-06-22', '2497', 'DEN', 'DTW'),
  ('DL', '2015-06-22', '2842', 'LAX', 'SLC'),
  ('DL', '2015-06-22', '300', 'SLC', 'DEN'),
  ('DL', '2015-06-22', '770', 'MSY', 'LAX'),
  ('DL', '2015-06-23', '1737', 'BWI', 'DTW'),
  ('DL', '2015-06-23', '1737', 'DTW', 'BWI'),
  ('DL', '2015-06-23', '1795', 'DTW', 'PHX'),
  ('DL', '2015-06-24', '1600', 'SLC', 'BZN'),
  ('DL', '2015-06-24', '1994', 'PHX', 'JFK'),
  ('DL', '2015-06-24', '2140', 'LAS', 'SLC'),
  ('DL', '2015-06-24', '443', 'JFK', 'LAS'),
  ('DL', '2015-06-25', '1217', 'DTW', 'LAS'),
  ('DL', '2015-06-25', '134', 'PHX', 'DTW'),
  ('DL', '2015-06-25', '2067', 'SLC', 'PHX'),
  ('DL', '2015-06-25', '2526', 'BZN', 'SLC'),
  ('DL', '2015-06-25', '2796', 'LAS', 'JFK'),
  ('DL', '2015-06-26', '1901', 'MIA', 'LAX'),
  ('DL', '2015-06-26', '454', 'JFK', 'ATL'),
  ('DL', '2015-06-26', '495', 'JFK', 'MIA'),
  ('DL', '2015-06-26', '843', 'ATL', 'JFK'),
  ('DL', '2015-06-27', '2849', 'BNA', 'LAX'),
  ('DL', '2015-06-27', '2849', 'LAX', 'BNA'),
  ('DL', '2015-06-28', '1432', 'LAX', 'SEA'),
  ('DL', '2015-06-28', '1653', 'SEA', 'FAI'),
  ('DL', '2015-06-28', '943', 'ANC', 'SEA'),
  ('DL', '2015-06-28', '943', 'SEA', 'ANC'),
  ('DL', '2015-06-29', '1578', 'SEA', 'LAX'),
  ('DL', '2015-06-29', '1611', 'FAI', 'SEA'),
  ('DL', '2015-06-29', '523', 'LAX', 'TPA'),
  ('DL', '2015-06-30', '1558', 'TPA', 'LAX'),
  ('DL', '2015-06-30', '1845', 'BOS', 'MSP'),
  ('DL', '2015-06-30', '2534', 'LAX', 'BOS'),
  ('DL', '2015-07-01', '1357', 'MSP', 'DTW'),
  ('DL', '2015-07-02', '1231', 'DCA', 'DTW'),
  ('DL', '2015-07-02', '1232', 'DTW', 'STL'),
  ('DL', '2015-07-02', '1232', 'STL', 'DTW'),
  ('DL', '2015-07-02', '1737', 'BWI', 'DTW'),
  ('DL', '2015-07-02', '1737', 'DTW', 'BWI'),
  ('DL', '2015-07-02', '2144', 'DTW', 'DCA'),
  ('DL', '2015-07-03', '1749', 'DTW', 'PHL'),
  ('DL', '2015-07-04', '1067', 'DTW', 'PDX'),
  ('DL', '2015-07-04', '1238', 'DTW', 'MSY'),
  ('DL', '2015-07-04', '1238', 'MSY', 'DTW'),
  ('DL', '2015-07-04', '1322', 'PHL', 'DTW'),
  ('DL', '2015-07-05', '1156', 'PDX', 'ATL'),
  ('DL', '2015-07-05', '1791', 'ATL', 'CHS'),
  ('DL', '2015-07-05', '1791', 'CHS', 'ATL'),
  ('DL', '2015-07-05', '503', 'ATL', 'PDX'),
  ('DL', '2015-07-05', '503', 'PDX', 'ATL'),
  ('DL', '2015-07-06', '1061', 'LAX', 'MCO'),
  ('DL', '2015-07-06', '1061', 'MCO', 'LAX'),
  ('DL', '2015-07-06', '1255', 'ATL', 'LAX'),
  ('DL', '2015-07-08', '1905', 'LAX', 'SEA'),
  ('DL', '2015-07-08', '198', 'SEA', 'LAX'),
  ('DL', '2015-07-08', '2592', 'LAX', 'MSY'),
  ('DL', '2015-07-09', '1805', 'TPA', 'LAX'),
  ('DL', '2015-07-09', '301', 'LAX', 'TPA'),
  ('DL', '2015-07-09', '545', 'LAX', 'CVG'),
  ('DL', '2015-07-09', '984', 'MSY', 'LAX'),
  ('DL', '2015-07-10', '1807', 'CVG', 'SEA'),
  ('DL', '2015-07-10', '2258', 'SEA', 'FAI'),
  ('DL', '2015-07-10', '943', 'ANC', 'SEA'),
  ('DL', '2015-07-10', '943', 'SEA', 'ANC'),
  ('DL', '2015-07-11', '1304', 'SLC', 'ATL'),
  ('DL', '2015-07-11', '1611', 'FAI', 'SEA'),
  ('DL', '2015-07-11', '1682', 'CVG', 'SLC'),
  ('DL', '2015-07-11', '1974', 'SEA', 'LAX'),
  ('DL', '2015-07-11', '2640', 'LAX', 'CVG'),
  ('DL', '2015-07-12', '2188', 'SEA', 'JNU'),
  ('DL', '2015-07-12', '2841', 'LAX', 'SEA'),
  ('DL', '2015-07-13', '1470', 'SEA', 'LAX'),
  ('DL', '2015-07-13', '1781', 'JNU', 'SEA'),
  ('DL', '2015-07-13', '530', 'ANC', 'SEA'),
  ('DL', '2015-07-13', '530', 'SEA', 'ANC'),
  ('DL', '2015-07-14', '1794', 'PDX', 'MSP'),
  ('DL', '2015-07-14', '773', 'ATL', 'PDX'),
  ('DL', '2015-07-15', '1975', 'MSY', 'LAX'),
  ('DL', '2015-07-15', '2406', 'MSP', 'MSY'),
  ('DL', '2015-07-15', '2530', 'LAX', 'DTW'),
  ('DL', '2015-07-16', '1737', 'BWI', 'DTW'),
  ('DL', '2015-07-16', '1737', 'DTW', 'BWI'),
  ('DL', '2015-07-16', '1821', 'DTW', 'PHX'),
  ('DL', '2015-07-16', '2201', 'PHX', 'JFK'),
  ('DL', '2015-07-17', '488', 'JFK', 'PHX'),
  ('DL', '2015-07-18', '1994', 'PHX', 'JFK'),
  ('DL', '2015-07-19', '451', 'JFK', 'SJU'),
  ('DL', '2015-07-20', '2609', 'JFK', 'MIA'),
  ('DL', '2015-07-20', '326', 'SJU', 'JFK'),
  ('DL', '2015-07-20', '42', 'MIA', 'JFK'),
  ('DL', '2015-07-22', '1841', 'SJU', 'JFK'),
  ('DL', '2015-07-22', '2474', 'JFK', 'SJU'),
  ('DL', '2015-07-25', '772', 'ATL', 'LAS'),
  ('DL', '2015-07-26', '1060', 'SLC', 'DFW'),
  ('DL', '2015-07-26', '1795', 'LAS', 'JFK'),
  ('DL', '2015-07-26', '2190', 'JFK', 'SLC'),
  ('DL', '2015-07-27', '1217', 'DTW', 'LAS'),
  ('DL', '2015-07-27', '134', 'PHX', 'DTW'),
  ('DL', '2015-07-27', '1728', 'LAS', 'JFK'),
  ('DL', '2015-07-27', '2067', 'SLC', 'PHX'),
  ('DL', '2015-07-27', '857', 'DFW', 'SLC'),
  ('DL', '2015-07-28', '2435', 'JFK', 'AUS'),
  ('DL', '2015-07-28', '2791', 'JFK', 'LAS'),
  ('DL', '2015-07-28', '401', 'AUS', 'JFK'),
  ('DL', '2015-07-29', '1217', 'DTW', 'LAS'),
  ('DL', '2015-07-29', '2576', 'LAS', 'DTW'),
  ('DL', '2015-07-30', '1602', 'LAS', 'ATL'),
  ('DL', '2015-07-30', '2046', 'ATL', 'PHX'),
  ('DL', '2015-07-31', '199', 'LAX', 'SEA'),
  ('DL', '2015-07-31', '2202', 'PHX', 'LAX'),
  ('DL', '2015-07-31', '2220', 'LAX', 'ATL'),
  ('DL', '2015-07-31', '848', 'SEA', 'LAX'),
  ('DL', '2015-08-01', '459', 'JFK', 'BOS'),
  ('DL', '2015-08-02', '1627', 'PDX', 'JFK'),
  ('DL', '2015-08-02', '2391', 'BOS', 'MSP'),
  ('DL', '2015-08-02', '710', 'MSP', 'PDX'),
  ('DL', '2015-08-03', '2842', 'ATL', 'COS'),
  ('DL', '2015-08-04', '1789', 'ATL', 'JAX'),
  ('DL', '2015-08-04', '1789', 'JAX', 'ATL'),
  ('DL', '2015-08-04', '2114', 'COS', 'ATL'),
  ('DL', '2015-08-04', '347', 'ATL', 'ORD'),
  ('DL', '2015-08-04', '347', 'ORD', 'ATL'),
  ('DL', '2015-08-04', '772', 'ATL', 'LAS'),
  ('DL', '2015-08-05', '1280', 'SLC', 'MSP'),
  ('DL', '2015-08-05', '1391', 'LAS', 'ATL'),
  ('DL', '2015-08-05', '1450', 'LAS', 'MSP'),
  ('DL', '2015-08-05', '2830', 'MSP', 'LAS'),
  ('DL', '2015-08-05', '787', 'MSP', 'SLC'),
  ('DL', '2015-08-06', '1061', 'LAX', 'MCO'),
  ('DL', '2015-08-06', '1061', 'MCO', 'LAX'),
  ('DL', '2015-08-06', '1255', 'ATL', 'LAX'),
  ('DL', '2015-08-06', '545', 'LAX', 'CVG'),
  ('DL', '2015-08-07', '1807', 'CVG', 'SEA'),
  ('DL', '2015-08-07', '2258', 'SEA', 'FAI'),
  ('DL', '2015-08-07', '943', 'ANC', 'SEA'),
  ('DL', '2015-08-07', '943', 'SEA', 'ANC'),
  ('DL', '2015-08-08', '1611', 'FAI', 'SEA'),
  ('DL', '2015-08-08', '1682', 'CVG', 'SLC'),
  ('DL', '2015-08-08', '1974', 'SEA', 'LAX'),
  ('DL', '2015-08-08', '2640', 'LAX', 'CVG'),
  ('DL', '2015-08-09', '2579', 'JFK', 'SAN'),
  ('DL', '2015-08-09', '2606', 'SAN', 'JFK'),
  ('DL', '2015-08-09', '466', 'SLC', 'JFK'),
  ('DL', '2015-08-10', '2836', 'JFK', 'PDX'),
  ('DL', '2015-08-11', '1909', 'SLC', 'SAN'),
  ('DL', '2015-08-11', '2140', 'LAS', 'SLC'),
  ('DL', '2015-08-11', '400', 'PDX', 'JFK'),
  ('DL', '2015-08-11', '443', 'JFK', 'LAS'),
  ('DL', '2015-08-12', '1189', 'BWI', 'SLC'),
  ('DL', '2015-08-12', '1189', 'SLC', 'BWI'),
  ('DL', '2015-08-12', '1337', 'SLC', 'LAX'),
  ('DL', '2015-08-12', '2020', 'LAX', 'MEM'),
  ('DL', '2015-08-12', '978', 'SAN', 'SLC'),
  ('DL', '2015-08-13', '1577', 'LAX', 'MSY'),
  ('DL', '2015-08-13', '1627', 'PDX', 'JFK'),
  ('DL', '2015-08-13', '710', 'MSP', 'PDX'),
  ('DL', '2015-08-13', '713', 'MSY', 'MSP'),
  ('DL', '2015-08-13', '838', 'MEM', 'LAX'),
  ('DL', '2015-08-14', '2842', 'ATL', 'COS'),
  ('DL', '2015-08-15', '1949', 'ATL', 'MIA'),
  ('DL', '2015-08-15', '2114', 'COS', 'ATL'),
  ('DL', '2015-08-16', '1728', 'LAS', 'JFK'),
  ('DL', '2015-08-16', '1883', 'SLC', 'LAS'),
  ('DL', '2015-08-16', '459', 'JFK', 'SLC'),
  ('DL', '2015-08-16', '782', 'MIA', 'JFK'),
  ('DL', '2015-08-17', '445', 'JFK', 'AUS'),
  ('DL', '2015-08-18', '1972', 'AUS', 'JFK'),
  ('DL', '2015-08-19', '953', 'ATL', 'SAT'),
  ('DL', '2015-08-20', '873', 'SAT', 'JFK'),
  ('DL', '2015-08-21', '1841', 'SJU', 'JFK'),
  ('DL', '2015-08-21', '2571', 'JFK', 'SAT'),
  ('DL', '2015-08-21', '447', 'JFK', 'SJU'),
  ('DL', '2015-08-22', '873', 'SAT', 'JFK'),
  ('DL', '2015-08-23', '1841', 'SJU', 'JFK'),
  ('DL', '2015-08-23', '2784', 'JFK', 'SAT'),
  ('DL', '2015-08-23', '447', 'JFK', 'SJU'),
  ('DL', '2015-08-24', '1448', 'ATL', 'DTW'),
  ('DL', '2015-08-24', '774', 'SAT', 'ATL'),
  ('DL', '2015-08-25', '2066', 'MSP', 'LAS'),
  ('DL', '2015-08-25', '2391', 'BOS', 'MSP'),
  ('DL', '2015-08-25', '2437', 'DTW', 'BOS'),
  ('DL', '2015-08-26', '1795', 'LAS', 'JFK'),
  ('DL', '2015-08-26', '485', 'JFK', 'ATL'),
  ('DL', '2015-08-28', '1357', 'MSP', 'DTW'),
  ('DL', '2015-08-28', '2223', 'MSP', 'ORD'),
  ('DL', '2015-08-28', '2223', 'ORD', 'MSP'),
  ('DL', '2015-08-29', '1056', 'SEA', 'CVG'),
  ('DL', '2015-08-29', '2045', 'ANC', 'SEA'),
  ('DL', '2015-08-29', '2045', 'SEA', 'ANC'),
  ('DL', '2015-08-29', '281', 'DTW', 'SEA'),
  ('DL', '2015-08-30', '1807', 'CVG', 'SEA'),
  ('DL', '2015-08-30', '2188', 'SEA', 'JNU'),
  ('DL', '2015-08-30', '943', 'ANC', 'SEA'),
  ('DL', '2015-08-30', '943', 'SEA', 'ANC'),
  ('DL', '2015-08-31', '1781', 'JNU', 'SEA'),
  ('DL', '2015-08-31', '384', 'SEA', 'LAX'),
  ('DL', '2015-08-31', '530', 'ANC', 'SEA'),
  ('DL', '2015-08-31', '530', 'SEA', 'ANC'),
  ('DL', '2015-09-01', '1553', 'ATL', 'DFW'),
  ('DL', '2015-09-01', '30', 'DFW', 'ATL'),
  ('DL', '2015-09-28', '1534', 'IND', 'ATL'),
  ('DL', '2015-09-29', '1402', 'ATL', 'LAS'),
  ('DL', '2015-09-29', '1529', 'LAS', 'JFK'),
  ('DL', '2015-09-30', '1151', 'MSP', 'SFO'),
  ('DL', '2015-09-30', '1998', 'SFO', 'SLC'),
  ('DL', '2015-09-30', '2175', 'JFK', 'MSP'),
  ('DL', '2015-09-30', '2619', 'LAS', 'JFK'),
  ('DL', '2015-09-30', '779', 'SLC', 'LAS'),
  ('DL', '2015-10-01', '2833', 'JFK', 'SAN'),
  ('DL', '2015-10-01', '403', 'SAN', 'JFK'),
  ('DL', '2015-10-02', '1210', 'DTW', 'RDU'),
  ('DL', '2015-10-02', '2122', 'PHX', 'DTW'),
  ('DL', '2015-10-02', '425', 'JFK', 'PHX'),
  ('DL', '2015-10-03', '1124', 'LAX', 'PHX'),
  ('DL', '2015-10-03', '2618', 'RDU', 'LAX'),
  ('DL', '2015-10-04', '1128', 'PHX', 'ATL'),
  ('DL', '2015-10-04', '2330', 'ATL', 'SLC'),
  ('DL', '2015-10-04', '662', 'SLC', 'LAX'),
  ('DL', '2015-10-04', '739', 'LAX', 'PHX'),
  ('DL', '2015-10-05', '1429', 'LAS', 'JFK'),
  ('DL', '2015-10-05', '1994', 'PHX', 'JFK'),
  ('DL', '2015-10-05', '439', 'JFK', 'LAS'),
  ('DL', '2015-10-06', '403', 'SAN', 'JFK'),
  ('DL', '2015-10-06', '437', 'JFK', 'SAN'),
  ('DL', '2015-10-07', '29', 'ATL', 'DFW'),
  ('DL', '2015-10-08', '1890', 'DFW', 'ATL'),
  ('DL', '2015-10-08', '2556', 'ATL', 'RDU'),
  ('DL', '2015-10-09', '2196', 'SLC', 'MCO'),
  ('DL', '2015-10-09', '2332', 'LAX', 'SLC'),
  ('DL', '2015-10-09', '2618', 'RDU', 'LAX'),
  ('DL', '2015-10-10', '1332', 'CVG', 'LAS'),
  ('DL', '2015-10-10', '842', 'SLC', 'CVG'),
  ('DL', '2015-10-10', '995', 'MCO', 'SLC'),
  ('DL', '2015-10-11', '1490', 'DTW', 'LAS'),
  ('DL', '2015-10-11', '1932', 'LAS', 'DTW'),
  ('DL', '2015-10-11', '780', 'LAS', 'MSP'),
  ('DL', '2015-10-12', '1280', 'SLC', 'MSP'),
  ('DL', '2015-10-12', '1728', 'LAS', 'JFK'),
  ('DL', '2015-10-12', '2051', 'MSP', 'LAS'),
  ('DL', '2015-10-12', '787', 'MSP', 'SLC'),
  ('DL', '2015-10-13', '2153', 'JFK', 'PHX'),
  ('DL', '2015-10-13', '2217', 'JFK', 'SJU'),
  ('DL', '2015-10-13', '511', 'SJU', 'JFK'),
  ('DL', '2015-10-14', '1529', 'LAS', 'CVG'),
  ('DL', '2015-10-14', '1994', 'PHX', 'JFK'),
  ('DL', '2015-10-14', '439', 'JFK', 'LAS'),
  ('DL', '2015-10-15', '2315', 'CVG', 'ATL'),
  ('DL', '2015-10-19', '1740', 'LAS', 'SLC'),
  ('DL', '2015-10-19', '1830', 'SLC', 'JFK'),
  ('DL', '2015-10-19', '421', 'JFK', 'LAS'),
  ('DL', '2015-10-20', '1740', 'LAS', 'SLC'),
  ('DL', '2015-10-20', '421', 'JFK', 'LAS'),
  ('DL', '2015-10-20', '662', 'SLC', 'LAX'),
  ('DL', '2015-10-20', '739', 'LAX', 'PHX'),
  ('DL', '2015-10-21', '1128', 'PHX', 'ATL'),
  ('DL', '2015-10-21', '1705', 'ATL', 'SFO'),
  ('DL', '2015-10-21', '1705', 'SFO', 'ATL'),
  ('DL', '2015-10-22', '1351', 'ATL', 'AUS'),
  ('DL', '2015-10-22', '1351', 'AUS', 'ATL'),
  ('DL', '2015-10-22', '2241', 'ATL', 'ORD'),
  ('DL', '2015-10-23', '977', 'ORD', 'ATL'),
  ('DL', '2015-10-27', '1374', 'OAK', 'SLC'),
  ('DL', '2015-10-27', '1374', 'SLC', 'OAK'),
  ('DL', '2015-10-27', '2436', 'SLC', 'DEN'),
  ('DL', '2015-10-27', '453', 'JFK', 'SLC'),
  ('DL', '2015-10-28', '1072', 'SFO', 'SLC'),
  ('DL', '2015-10-28', '1151', 'MSP', 'SFO'),
  ('DL', '2015-10-28', '1429', 'LAS', 'JFK'),
  ('DL', '2015-10-28', '1716', 'DEN', 'MSP'),
  ('DL', '2015-10-28', '779', 'SLC', 'LAS'),
  ('DL', '2015-10-29', '1841', 'SJU', 'JFK'),
  ('DL', '2015-10-29', '442', 'JFK', 'SJU'),
  ('DL', '2015-10-30', '1374', 'OAK', 'SLC'),
  ('DL', '2015-10-30', '1374', 'SLC', 'OAK'),
  ('DL', '2015-10-30', '453', 'JFK', 'SLC'),
  ('DL', '2015-10-30', '652', 'SLC', 'BZN'),
  ('DL', '2015-10-31', '1158', 'SLC', 'MCO'),
  ('DL', '2015-10-31', '2526', 'BZN', 'SLC'),
  ('DL', '2015-11-01', '185', 'MCO', 'LAX'),
  ('DL', '2015-11-01', '2171', 'LAX', 'MIA'),
  ('DL', '2015-11-02', '1843', 'SLC', 'BOS'),
  ('DL', '2015-11-02', '2005', 'ATL', 'DTW'),
  ('DL', '2015-11-02', '2317', 'DTW', 'LAS'),
  ('DL', '2015-11-02', '89', 'LAS', 'SLC'),
  ('DL', '2015-11-03', '1542', 'SEA', 'JFK'),
  ('DL', '2015-11-03', '2083', 'BOS', 'SLC'),
  ('DL', '2015-11-03', '444', 'SLC', 'JFK'),
  ('DL', '2015-11-03', '460', 'JFK', 'SEA'),
  ('DL', '2015-11-04', '549', 'ATL', 'SJU'),
  ('DL', '2015-11-05', '147', 'SLC', 'ATL'),
  ('DL', '2015-11-05', '1507', 'ATL', 'SLC'),
  ('DL', '2015-11-05', '2831', 'ATL', 'MCI'),
  ('DL', '2015-11-05', '922', 'SJU', 'ATL'),
  ('DL', '2015-11-06', '1630', 'ATL', 'SAT'),
  ('DL', '2015-11-06', '2165', 'MCI', 'ATL'),
  ('DL', '2015-11-07', '303', 'ATL', 'RDU'),
  ('DL', '2015-11-07', '774', 'SAT', 'ATL'),
  ('DL', '2015-11-08', '1482', 'RDU', 'ATL'),
  ('DL', '2015-11-09', '882', 'ATL', 'MCI'),
  ('DL', '2015-11-10', '2165', 'MCI', 'ATL'),
  ('DL', '2015-11-11', '1786', 'ATL', 'LGA'),
  ('DL', '2015-11-11', '847', 'LGA', 'ATL'),
  ('DL', '2015-11-12', '1211', 'SLC', 'LAX'),
  ('DL', '2015-11-12', '2004', 'LAX', 'SLC'),
  ('DL', '2015-11-13', '1457', 'SLC', 'SEA'),
  ('DL', '2015-11-13', '1585', 'SLC', 'LAX'),
  ('DL', '2015-11-13', '2157', 'SEA', 'SLC'),
  ('DL', '2015-11-13', '2377', 'LAX', 'SLC'),
  ('DL', '2015-11-15', '303', 'ATL', 'RDU'),
  ('DL', '2015-11-16', '1137', 'RDU', 'ATL'),
  ('DL', '2015-11-16', '1402', 'ATL', 'LAS'),
  ('DL', '2015-11-16', '2508', 'LAS', 'JFK'),
  ('DL', '2015-11-17', '2474', 'JFK', 'SJU'),
  ('DL', '2015-11-17', '2838', 'JFK', 'SJU'),
  ('DL', '2015-11-17', '42', 'SJU', 'JFK'),
  ('DL', '2015-11-18', '326', 'SJU', 'JFK'),
  ('DL', '2015-11-18', '332', 'SJU', 'JFK'),
  ('DL', '2015-11-18', '446', 'JFK', 'SJU'),
  ('DL', '2015-11-20', '1389', 'LAS', 'DTW'),
  ('DL', '2015-11-20', '459', 'JFK', 'LAS'),
  ('DL', '2015-11-21', '36', 'SEA', 'ANC'),
  ('DL', '2015-11-21', '935', 'DTW', 'SEA'),
  ('DL', '2015-11-22', '2303', 'JFK', 'PDX'),
  ('DL', '2015-11-22', '2397', 'SEA', 'SLC'),
  ('DL', '2015-11-22', '444', 'SLC', 'JFK'),
  ('DL', '2015-11-22', '822', 'ANC', 'SEA'),
  ('DL', '2015-11-23', '1736', 'PDX', 'JFK'),
  ('DL', '2015-11-23', '483', 'JFK', 'SEA'),
  ('DL', '2015-11-24', '1972', 'SEA', 'JFK'),
  ('DL', '2015-11-24', '405', 'JFK', 'LAS'),
  ('DL', '2015-11-24', '761', 'LAS', 'CVG'),
  ('DL', '2015-11-25', '1741', 'CVG', 'SLC'),
  ('DL', '2015-11-25', '2303', 'JFK', 'PDX'),
  ('DL', '2015-11-25', '444', 'SLC', 'JFK'),
  ('DL', '2015-11-26', '1831', 'PDX', 'SLC'),
  ('DL', '2015-11-26', '1859', 'SLC', 'LAX'),
  ('DL', '2015-11-26', '2170', 'SLC', 'LAX'),
  ('DL', '2015-11-26', '2825', 'LAX', 'SLC'),
  ('DL', '2015-11-27', '2200', 'LAX', 'MIA'),
  ('DL', '2015-11-28', '1169', 'MIA', 'LAX'),
  ('DL', '2015-11-28', '1566', 'LAX', 'RDU'),
  ('DL', '2015-11-28', '2787', 'BNA', 'LAX'),
  ('DL', '2015-11-28', '2787', 'LAX', 'BNA'),
  ('DL', '2015-11-29', '1170', 'LAX', 'CMH'),
  ('DL', '2015-11-29', '2618', 'RDU', 'LAX'),
  ('DL', '2015-11-30', '1327', 'CMH', 'LAX'),
  ('DL', '2015-11-30', '2034', 'MIA', 'JFK'),
  ('DL', '2015-11-30', '2200', 'LAX', 'MIA'),
  ('DL', '2015-12-01', '1473', 'SEA', 'JFK'),
  ('DL', '2015-12-01', '2303', 'JFK', 'PDX'),
  ('DL', '2015-12-01', '428', 'JFK', 'SEA'),
  ('DL', '2015-12-02', '1504', 'PDX', 'JFK'),
  ('DL', '2015-12-02', '483', 'JFK', 'SEA'),
  ('DL', '2015-12-03', '2397', 'SEA', 'SLC'),
  ('DL', '2015-12-03', '444', 'SLC', 'JFK'),
  ('DL', '2015-12-03', '446', 'JFK', 'SAN'),
  ('DL', '2015-12-04', '1260', 'SLC', 'PDX'),
  ('DL', '2015-12-04', '2404', 'SAN', 'JFK'),
  ('DL', '2015-12-04', '431', 'JFK', 'SLC'),
  ('DL', '2015-12-05', '1074', 'SLC', 'MSP'),
  ('DL', '2015-12-05', '1683', 'SLC', 'LAS'),
  ('DL', '2015-12-05', '1980', 'PDX', 'SLC'),
  ('DL', '2015-12-05', '89', 'LAS', 'SLC'),
  ('DL', '2015-12-06', '1454', 'LAX', 'ATL'),
  ('DL', '2015-12-06', '1577', 'MSY', 'LAX'),
  ('DL', '2015-12-06', '2839', 'MSP', 'MSY'),
  ('DL', '2015-12-07', '1204', 'ATL', 'LAX'),
  ('DL', '2015-12-07', '2428', 'LAX', 'TPA'),
  ('DL', '2015-12-08', '1061', 'LAX', 'MCO'),
  ('DL', '2015-12-08', '1061', 'MCO', 'LAX'),
  ('DL', '2015-12-08', '1558', 'TPA', 'LAX'),
  ('DL', '2015-12-09', '1654', 'LAX', 'ATL'),
  ('DL', '2015-12-10', '1204', 'ATL', 'LAX'),
  ('DL', '2015-12-10', '2428', 'LAX', 'TPA'),
  ('DL', '2015-12-11', '1061', 'LAX', 'MCO'),
  ('DL', '2015-12-11', '1061', 'MCO', 'LAX'),
  ('DL', '2015-12-11', '1558', 'TPA', 'LAX'),
  ('DL', '2015-12-12', '1554', 'LAX', 'ATL'),
  ('DL', '2015-12-13', '1061', 'LAX', 'MCO'),
  ('DL', '2015-12-13', '1061', 'MCO', 'LAX'),
  ('DL', '2015-12-13', '1204', 'ATL', 'LAX'),
  ('DL', '2015-12-13', '186', 'LAX', 'MCO'),
  ('DL', '2015-12-14', '185', 'MCO', 'LAX'),
  ('DL', '2015-12-14', '2787', 'BNA', 'LAX'),
  ('DL', '2015-12-14', '2787', 'LAX', 'BNA'),
  ('DL', '2015-12-15', '2534', 'LAX', 'BOS'),
  ('DL', '2015-12-16', '2145', 'SLC', 'SEA'),
  ('DL', '2015-12-16', '2531', 'BOS', 'LAX'),
  ('DL', '2015-12-16', '36', 'SEA', 'ANC'),
  ('DL', '2015-12-16', '671', 'LAX', 'SLC'),
  ('DL', '2015-12-17', '1384', 'LAX', 'SEA'),
  ('DL', '2015-12-17', '142', 'ANC', 'SEA'),
  ('DL', '2015-12-17', '1549', 'SEA', 'JNU'),
  ('DL', '2015-12-17', '2019', 'SEA', 'LAX'),
  ('DL', '2015-12-18', '1487', 'DTW', 'MEM'),
  ('DL', '2015-12-18', '2073', 'SEA', 'DTW'),
  ('DL', '2015-12-18', '2230', 'JNU', 'SEA'),
  ('DL', '2015-12-19', '1678', 'MSP', 'LAX'),
  ('DL', '2015-12-19', '2020', 'MEM', 'LAX'),
  ('DL', '2015-12-19', '2534', 'LAX', 'BOS'),
  ('DL', '2015-12-19', '2549', 'BOS', 'MSP'),
  ('DL', '2015-12-20', '1549', 'SEA', 'JNU'),
  ('DL', '2015-12-20', '1727', 'LAX', 'SEA'),
  ('DL', '2015-12-21', '2230', 'JNU', 'SEA'),
  ('DL', '2015-12-21', '2413', 'SEA', 'LAX'),
  ('DL', '2015-12-22', '1321', 'MSP', 'BOI'),
  ('DL', '2015-12-22', '1668', 'LAX', 'MSP'),
  ('DL', '2015-12-23', '1316', 'BOI', 'MSP'),
  ('DL', '2015-12-23', '1454', 'LAX', 'ATL'),
  ('DL', '2015-12-23', '1577', 'MSY', 'LAX'),
  ('DL', '2015-12-23', '2839', 'MSP', 'MSY'),
  ('DL', '2015-12-24', '2414', 'ATL', 'SEA'),
  ('DL', '2015-12-25', '1549', 'SEA', 'JNU'),
  ('DL', '2015-12-25', '2791', 'SEA', 'DTW'),
  ('DL', '2015-12-25', '662', 'DTW', 'SEA'),
  ('DL', '2015-12-26', '1554', 'LAX', 'ATL'),
  ('DL', '2015-12-26', '2230', 'JNU', 'SEA'),
  ('DL', '2015-12-26', '2413', 'SEA', 'LAX'),
  ('DL', '2015-12-27', '2117', 'ATL', 'COS'),
  ('DL', '2015-12-28', '1613', 'ATL', 'IND'),
  ('DL', '2015-12-28', '1613', 'IND', 'ATL'),
  ('DL', '2015-12-28', '2114', 'COS', 'ATL'),
  ('DL', '2015-12-29', '1241', 'SLC', 'BZN'),
  ('DL', '2015-12-29', '1782', 'MSP', 'SLC'),
  ('DL', '2015-12-29', '721', 'ATL', 'MSP'),
  ('DL', '2015-12-30', '1170', 'LAX', 'CMH'),
  ('DL', '2015-12-30', '1208', 'BZN', 'SLC'),
  ('DL', '2015-12-30', '1223', 'DTW', 'LAX'),
  ('DL', '2015-12-30', '1483', 'SLC', 'LAS'),
  ('DL', '2015-12-30', '1545', 'LAS', 'DTW'),
  ('DL', '2015-12-31', '1327', 'CMH', 'LAX'),
  ('DL', '2015-12-31', '1431', 'MSY', 'MSP'),
  ('DL', '2015-12-31', '2340', 'LAX', 'MSY')]}

Counting Late Flights


In [56]:
total_flights = on_time_dataframe.count()

# Flights that were late leaving...
late_departures = on_time_dataframe.filter(
  on_time_dataframe.DepDelayMinutes > 0
)
total_late_departures = late_departures.count()
print(f'{total_late_departures:,}')

# Flights that were late arriving...
late_arrivals = on_time_dataframe.filter(
  on_time_dataframe.ArrDelayMinutes > 0
)
total_late_arrivals = late_arrivals.count()
print(f'{total_late_arrivals:,}')

# Get the percentage of flights that are late, rounded to 1 decimal place
pct_late = round((total_late_arrivals / (total_flights * 1.0)) * 100, 1)
pct_late


2,125,618
2,086,896
Out[56]:
35.9

Counting Flights with Hero Captains

"Hero Captains" are those that depart late but make up time in the air and arrive on time or early.


In [60]:
# Flights that left late but made up time to arrive on time...
on_time_heros = on_time_dataframe.filter(
  (on_time_dataframe.DepDelayMinutes > 0)
  &
  (on_time_dataframe.ArrDelayMinutes <= 0)
)
total_on_time_heros = on_time_heros.count()
print(f'{total_on_time_heros:,}')


606,902

Printing Our Results


In [61]:
print("Total flights:   {:,}".format(total_flights))
print("Late departures: {:,}".format(total_late_departures))
print("Late arrivals:   {:,}".format(total_late_arrivals))
print("Recoveries:      {:,}".format(total_on_time_heros))
print("Percentage Late: {}%".format(pct_late))


Total flights:   5,819,079
Late departures: 2,125,618
Late arrivals:   2,086,896
Recoveries:      606,902
Percentage Late: 35.9%

Computing the Average Lateness Per Flights


In [62]:
# Get the average minutes late departing and arriving
spark.sql("""
SELECT
  ROUND(AVG(DepDelay),1) AS AvgDepDelay,
  ROUND(AVG(ArrDelay),1) AS AvgArrDelay
FROM on_time_performance
"""
).show()


+-----------+-----------+
|AvgDepDelay|AvgArrDelay|
+-----------+-----------+
|        9.4|        4.4|
+-----------+-----------+

Inspecting Late Flights


In [63]:
# Why are flights late? Lets look at some delayed flights and the delay causes
late_flights = spark.sql("""
SELECT
  ArrDelayMinutes,
  WeatherDelay,
  CarrierDelay,
  NASDelay,
  SecurityDelay,
  LateAircraftDelay
FROM
  on_time_performance
WHERE
  WeatherDelay IS NOT NULL
  OR
  CarrierDelay IS NOT NULL
  OR
  NASDelay IS NOT NULL
  OR
  SecurityDelay IS NOT NULL
  OR
  LateAircraftDelay IS NOT NULL
ORDER BY
  FlightDate
""")
late_flights.sample(False, 0.01).show()


+---------------+------------+------------+--------+-------------+-----------------+
|ArrDelayMinutes|WeatherDelay|CarrierDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---------------+------------+------------+--------+-------------+-----------------+
|           31.0|         0.0|        31.0|     0.0|          0.0|              0.0|
|           61.0|         0.0|         0.0|     0.0|          0.0|             61.0|
|           17.0|         0.0|         0.0|    17.0|          0.0|              0.0|
|          310.0|         0.0|        75.0|     0.0|          0.0|            235.0|
|           17.0|         0.0|         0.0|     2.0|          0.0|             15.0|
|           56.0|         0.0|        30.0|     0.0|          0.0|             26.0|
|           17.0|         0.0|        14.0|     3.0|          0.0|              0.0|
|          126.0|         0.0|         0.0|    21.0|          0.0|            105.0|
|           29.0|         0.0|         0.0|     2.0|          0.0|             27.0|
|           60.0|         0.0|        20.0|    36.0|          0.0|              4.0|
|           20.0|         0.0|         0.0|    20.0|          0.0|              0.0|
|           23.0|         0.0|        23.0|     0.0|          0.0|              0.0|
|           88.0|         0.0|         0.0|     0.0|          0.0|             88.0|
|           76.0|         0.0|         2.0|     0.0|          0.0|             74.0|
|           25.0|         0.0|         6.0|    19.0|          0.0|              0.0|
|           32.0|         0.0|        21.0|    11.0|          0.0|              0.0|
|           69.0|         0.0|         1.0|    68.0|          0.0|              0.0|
|           29.0|         0.0|         4.0|    25.0|          0.0|              0.0|
|           22.0|         0.0|         8.0|    14.0|          0.0|              0.0|
|           25.0|         0.0|         0.0|     0.0|          0.0|             25.0|
+---------------+------------+------------+--------+-------------+-----------------+
only showing top 20 rows

Determining Why Flights Are Late


In [64]:
# Calculate the percentage contribution to delay for each source
total_delays = spark.sql("""
SELECT
  ROUND(SUM(WeatherDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_weather_delay,
  ROUND(SUM(CarrierDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_carrier_delay,
  ROUND(SUM(NASDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_nas_delay,
  ROUND(SUM(SecurityDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_security_delay,
  ROUND(SUM(LateAircraftDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_late_aircraft_delay
FROM on_time_performance
""")
total_delays.show()


+-----------------+-----------------+-------------+------------------+-----------------------+
|pct_weather_delay|pct_carrier_delay|pct_nas_delay|pct_security_delay|pct_late_aircraft_delay|
+-----------------+-----------------+-------------+------------------+-----------------------+
|              4.5|             29.2|         20.7|               0.1|                   36.1|
+-----------------+-----------------+-------------+------------------+-----------------------+

Computing a Histogram of Weather Delayed Flights


In [67]:
# Eyeball the first to define our buckets
weather_delay_histogram = on_time_dataframe\
  .select("WeatherDelay")\
  .rdd\
  .flatMap(lambda x: x)\
  .histogram([1, 5, 10, 15, 30, 60, 120, 240, 480, 720, 24*60.0])
print(weather_delay_histogram)


([1, 5, 10, 15, 30, 60, 120, 240, 480, 720, 1440.0], [5436, 7668, 6636, 16007, 13569, 9442, 4598, 1136, 152, 72])

In [69]:
# See above for definition
create_hist(weather_delay_histogram)


Out[69]:
<BarContainer object of 10 artists>

Preparing a Histogram for Visualization by d3.js


In [73]:
# Transform the data into something easily consumed by d3
def histogram_to_publishable(histogram):
  record = {'key': 1, 'data': []}
  for label, value in zip(histogram[0], histogram[1]):
    record['data'].append(
      {
        'label': label,
        'value': value
      }
    )
  return record

# Recompute the weather histogram with a filter for on-time flights
weather_delay_histogram = on_time_dataframe\
  .filter(
    (on_time_dataframe.WeatherDelay.isNotNull())
    &
    (on_time_dataframe.WeatherDelay > 0)
  )\
  .select("WeatherDelay")\
  .rdd\
  .flatMap(lambda x: x)\
  .histogram([0, 15, 30, 60, 120, 240, 480, 720, 24*60.0])
print(weather_delay_histogram)

record = histogram_to_publishable(weather_delay_histogram)
record


([0, 15, 30, 60, 120, 240, 480, 720, 1440.0], [19740, 16007, 13569, 9442, 4598, 1136, 152, 72])
Out[73]:
{'key': 1,
 'data': [{'label': 0, 'value': 19740},
  {'label': 15, 'value': 16007},
  {'label': 30, 'value': 13569},
  {'label': 60, 'value': 9442},
  {'label': 120, 'value': 4598},
  {'label': 240, 'value': 1136},
  {'label': 480, 'value': 152},
  {'label': 720, 'value': 72}]}

Building a Classifier Model to Predict Flight Delays

Loading Our Data


In [74]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf

schema = StructType([
  StructField("ArrDelay", DoubleType(), True),     # "ArrDelay":5.0
  StructField("CRSArrTime", TimestampType(), True),    # "CRSArrTime":"2015-12-31T03:20:00.000-08:00"
  StructField("CRSDepTime", TimestampType(), True),    # "CRSDepTime":"2015-12-31T03:05:00.000-08:00"
  StructField("Carrier", StringType(), True),     # "Carrier":"WN"
  StructField("DayOfMonth", IntegerType(), True), # "DayOfMonth":31
  StructField("DayOfWeek", IntegerType(), True),  # "DayOfWeek":4
  StructField("DayOfYear", IntegerType(), True),  # "DayOfYear":365
  StructField("DepDelay", DoubleType(), True),     # "DepDelay":14.0
  StructField("Dest", StringType(), True),        # "Dest":"SAN"
  StructField("Distance", DoubleType(), True),     # "Distance":368.0
  StructField("FlightDate", DateType(), True),    # "FlightDate":"2015-12-30T16:00:00.000-08:00"
  StructField("FlightNum", StringType(), True),   # "FlightNum":"6109"
  StructField("Origin", StringType(), True),      # "Origin":"TUS"
])

features = spark.read.json(
  "../data/simple_flight_delay_features.jsonl.bz2",
  schema=schema
)
features.first()


Out[74]:
Row(ArrDelay=13.0, CRSArrTime=datetime.datetime(2015, 1, 1, 18, 10), CRSDepTime=datetime.datetime(2015, 1, 1, 15, 30), Carrier='AA', DayOfMonth=1, DayOfWeek=4, DayOfYear=1, DepDelay=14.0, Dest='DFW', Distance=569.0, FlightDate=datetime.date(2014, 12, 31), FlightNum='1024', Origin='ABQ')

Check Data for Nulls


In [75]:
#
# Check for nulls in features before using Spark ML
#
null_counts = [(column, features.where(features[column].isNull()).count()) for column in features.columns]
cols_with_nulls = filter(lambda x: x[1] > 0, null_counts)
print(list(cols_with_nulls))


[]

Add a Route Column

Demonstrating the addition of a feature to our model...


In [76]:
#
# Add a Route variable to replace FlightNum
#
from pyspark.sql.functions import lit, concat

features_with_route = features.withColumn(
  'Route',
  concat(
    features.Origin,
    lit('-'),
    features.Dest
  )
)
features_with_route.select("Origin", "Dest", "Route").show(5)


+------+----+-------+
|Origin|Dest|  Route|
+------+----+-------+
|   ABQ| DFW|ABQ-DFW|
|   ABQ| DFW|ABQ-DFW|
|   ABQ| DFW|ABQ-DFW|
|   ATL| DFW|ATL-DFW|
|   ATL| DFW|ATL-DFW|
+------+----+-------+
only showing top 5 rows

Bucketizing ArrDelay into ArrDelayBucket


In [77]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay
#
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
bucketizer = Bucketizer(
  splits=splits,
  inputCol="ArrDelay",
  outputCol="ArrDelayBucket"
)
ml_bucketized_features = bucketizer.transform(features_with_route)

# Check the buckets out
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show()


+--------+--------------+
|ArrDelay|ArrDelayBucket|
+--------+--------------+
|    13.0|           2.0|
|    17.0|           2.0|
|    36.0|           3.0|
|   -21.0|           0.0|
|   -14.0|           1.0|
|    16.0|           2.0|
|    -7.0|           1.0|
|    13.0|           2.0|
|    25.0|           2.0|
|    58.0|           3.0|
|    14.0|           2.0|
|     1.0|           2.0|
|   -29.0|           0.0|
|   -10.0|           1.0|
|    -3.0|           1.0|
|    -8.0|           1.0|
|    -1.0|           1.0|
|   -14.0|           1.0|
|   -16.0|           0.0|
|    18.0|           2.0|
+--------+--------------+
only showing top 20 rows

Indexing Our String Fields into Numeric Fields


In [78]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Turn category fields into categoric feature vectors, then drop intermediate fields
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
               "Origin", "Dest", "Route"]:
  string_indexer = StringIndexer(
    inputCol=column,
    outputCol=column + "_index"
  )
  ml_bucketized_features = string_indexer.fit(ml_bucketized_features)\
                                          .transform(ml_bucketized_features)

# Check out the indexes
ml_bucketized_features.show(6)


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+-------------+----------------+---------------+---------------+------------+----------+-----------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|ArrDelayBucket|Carrier_index|DayOfMonth_index|DayOfWeek_index|DayOfYear_index|Origin_index|Dest_index|Route_index|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+-------------+----------------+---------------+---------------+------------+----------+-----------+
|    13.0|2015-01-01 18:10:00|2015-01-01 15:30:00|     AA|         1|        4|        1|    14.0| DFW|   569.0|2014-12-31|     1024|   ABQ|ABQ-DFW|           2.0|          2.0|            25.0|            0.0|          320.0|        53.0|       2.0|      938.0|
|    17.0|2015-01-01 10:15:00|2015-01-01 07:25:00|     AA|         1|        4|        1|    14.0| DFW|   569.0|2014-12-31|     1184|   ABQ|ABQ-DFW|           2.0|          2.0|            25.0|            0.0|          320.0|        53.0|       2.0|      938.0|
|    36.0|2015-01-01 11:45:00|2015-01-01 09:00:00|     AA|         1|        4|        1|    -2.0| DFW|   569.0|2014-12-31|      336|   ABQ|ABQ-DFW|           3.0|          2.0|            25.0|            0.0|          320.0|        53.0|       2.0|      938.0|
|   -21.0|2015-01-01 19:30:00|2015-01-01 17:55:00|     AA|         1|        4|        1|    -1.0| DFW|   731.0|2014-12-31|      125|   ATL|ATL-DFW|           0.0|          2.0|            25.0|            0.0|          320.0|         0.0|       2.0|       37.0|
|   -14.0|2015-01-01 10:25:00|2015-01-01 08:55:00|     AA|         1|        4|        1|    -4.0| DFW|   731.0|2014-12-31|     1455|   ATL|ATL-DFW|           1.0|          2.0|            25.0|            0.0|          320.0|         0.0|       2.0|       37.0|
|    16.0|2015-01-01 15:15:00|2015-01-01 13:45:00|     AA|         1|        4|        1|    15.0| DFW|   731.0|2014-12-31|     1473|   ATL|ATL-DFW|           2.0|          2.0|            25.0|            0.0|          320.0|         0.0|       2.0|       37.0|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+-------------+----------------+---------------+---------------+------------+----------+-----------+
only showing top 6 rows

Combining Numeric Fields into a Single Vector


In [79]:
# Handle continuous, numeric fields by combining them into one feature vector
numeric_columns = ["DepDelay", "Distance"]
index_columns = ["Carrier_index", "DayOfMonth_index",
                   "DayOfWeek_index", "DayOfYear_index", "Origin_index",
                   "Origin_index", "Dest_index", "Route_index"]
vector_assembler = VectorAssembler(
  inputCols=numeric_columns + index_columns,
  outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)

# Drop the index columns
for column in index_columns:
  final_vectorized_features = final_vectorized_features.drop(column)

# Check out the features
final_vectorized_features.show()


+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+--------------------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|ArrDelayBucket|        Features_vec|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+--------------------+
|    13.0|2015-01-01 18:10:00|2015-01-01 15:30:00|     AA|         1|        4|        1|    14.0| DFW|   569.0|2014-12-31|     1024|   ABQ|ABQ-DFW|           2.0|[14.0,569.0,2.0,2...|
|    17.0|2015-01-01 10:15:00|2015-01-01 07:25:00|     AA|         1|        4|        1|    14.0| DFW|   569.0|2014-12-31|     1184|   ABQ|ABQ-DFW|           2.0|[14.0,569.0,2.0,2...|
|    36.0|2015-01-01 11:45:00|2015-01-01 09:00:00|     AA|         1|        4|        1|    -2.0| DFW|   569.0|2014-12-31|      336|   ABQ|ABQ-DFW|           3.0|[-2.0,569.0,2.0,2...|
|   -21.0|2015-01-01 19:30:00|2015-01-01 17:55:00|     AA|         1|        4|        1|    -1.0| DFW|   731.0|2014-12-31|      125|   ATL|ATL-DFW|           0.0|[-1.0,731.0,2.0,2...|
|   -14.0|2015-01-01 10:25:00|2015-01-01 08:55:00|     AA|         1|        4|        1|    -4.0| DFW|   731.0|2014-12-31|     1455|   ATL|ATL-DFW|           1.0|[-4.0,731.0,2.0,2...|
|    16.0|2015-01-01 15:15:00|2015-01-01 13:45:00|     AA|         1|        4|        1|    15.0| DFW|   731.0|2014-12-31|     1473|   ATL|ATL-DFW|           2.0|[15.0,731.0,2.0,2...|
|    -7.0|2015-01-01 12:15:00|2015-01-01 10:45:00|     AA|         1|        4|        1|    -2.0| DFW|   731.0|2014-12-31|     1513|   ATL|ATL-DFW|           1.0|[-2.0,731.0,2.0,2...|
|    13.0|2015-01-01 16:50:00|2015-01-01 15:25:00|     AA|         1|        4|        1|     9.0| DFW|   731.0|2014-12-31|      194|   ATL|ATL-DFW|           2.0|[9.0,731.0,2.0,25...|
|    25.0|2015-01-01 20:30:00|2015-01-01 19:00:00|     AA|         1|        4|        1|    -2.0| DFW|   731.0|2014-12-31|      232|   ATL|ATL-DFW|           2.0|[-2.0,731.0,2.0,2...|
|    58.0|2015-01-01 21:40:00|2015-01-01 20:15:00|     AA|         1|        4|        1|    14.0| DFW|   731.0|2014-12-31|      276|   ATL|ATL-DFW|           3.0|[14.0,731.0,2.0,2...|
|    14.0|2015-01-01 13:25:00|2015-01-01 11:55:00|     AA|         1|        4|        1|    15.0| DFW|   731.0|2014-12-31|      314|   ATL|ATL-DFW|           2.0|[15.0,731.0,2.0,2...|
|     1.0|2015-01-01 18:05:00|2015-01-01 16:40:00|     AA|         1|        4|        1|    -5.0| DFW|   731.0|2014-12-31|      356|   ATL|ATL-DFW|           2.0|[-5.0,731.0,2.0,2...|
|   -29.0|2015-01-01 10:12:00|2015-01-01 08:15:00|     AA|         1|        4|        1|    -9.0| MIA|   594.0|2014-12-31|     1652|   ATL|ATL-MIA|           0.0|[-9.0,594.0,2.0,2...|
|   -10.0|2015-01-01 08:52:00|2015-01-01 07:00:00|     AA|         1|        4|        1|    -4.0| MIA|   594.0|2014-12-31|       17|   ATL|ATL-MIA|           1.0|[-4.0,594.0,2.0,2...|
|    -3.0|2015-01-01 23:02:00|2015-01-01 21:10:00|     AA|         1|        4|        1|    -7.0| MIA|   594.0|2014-12-31|      349|   ATL|ATL-MIA|           1.0|[-7.0,594.0,2.0,2...|
|    -8.0|2015-01-01 14:35:00|2015-01-01 13:30:00|     AA|         1|        4|        1|    -2.0| DFW|   190.0|2014-12-31|     1023|   AUS|AUS-DFW|           1.0|[-2.0,190.0,2.0,2...|
|    -1.0|2015-01-01 06:50:00|2015-01-01 05:50:00|     AA|         1|        4|        1|    -2.0| DFW|   190.0|2014-12-31|     1178|   AUS|AUS-DFW|           1.0|[-2.0,190.0,2.0,2...|
|   -14.0|2015-01-01 09:40:00|2015-01-01 08:30:00|     AA|         1|        4|        1|    -6.0| DFW|   190.0|2014-12-31|     1296|   AUS|AUS-DFW|           1.0|[-6.0,190.0,2.0,2...|
|   -16.0|2015-01-01 10:15:00|2015-01-01 09:05:00|     AA|         1|        4|        1|    -4.0| DFW|   190.0|2014-12-31|     1356|   AUS|AUS-DFW|           0.0|[-4.0,190.0,2.0,2...|
|    18.0|2015-01-01 16:55:00|2015-01-01 15:55:00|     AA|         1|        4|        1|     3.0| DFW|   190.0|2014-12-31|     1365|   AUS|AUS-DFW|           2.0|[3.0,190.0,2.0,25...|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+--------------------+
only showing top 20 rows

Training Our Model in an Experimental Setup


In [80]:
#
# Cross validate, train and evaluate classifier
#

# Test/train split
training_data, test_data = final_vectorized_features.randomSplit([0.7, 0.3])

# Instantiate and fit random forest classifier
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(
  featuresCol="Features_vec",
  labelCol="ArrDelayBucket",
  maxBins=4657,
  maxMemoryInMB=1024
)
model = rfc.fit(training_data)

# Evaluate model using test data
predictions = model.transform(test_data)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="ArrDelayBucket", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = {}".format(accuracy))

# Check a sample
predictions.sample(False, 0.001, 18).orderBy("CRSDepTime").show(6)


Accuracy = 0.5967731197652838
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+--------------------+--------------------+--------------------+----------+
|ArrDelay|         CRSArrTime|         CRSDepTime|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|  Route|ArrDelayBucket|        Features_vec|       rawPrediction|         probability|prediction|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+--------------------+--------------------+--------------------+----------+
|   -10.0|2015-01-02 08:35:00|2015-01-02 07:30:00|     WN|         2|        5|        2|    -4.0| LAX|   236.0|2015-01-01|     2169|   LAS|LAS-LAX|           1.0|[-4.0,236.0,0.0,5...|[4.69368614250692...|[0.23468430712534...|       1.0|
|   200.0|2015-01-02 14:40:00|2015-01-02 11:30:00|     B6|         2|        5|        2|   201.0| MCO|  1091.0|2015-01-01|     1887|   ORH|ORH-MCO|           3.0|[201.0,1091.0,7.0...|[0.38463403281393...|[0.01923170164069...|       3.0|
|   -13.0|2015-01-02 15:34:00|2015-01-02 13:57:00|     EV|         2|        5|        2|    -6.0| IAD|   451.0|2015-01-01|     4757|   SDF|SDF-IAD|           1.0|[-6.0,451.0,4.0,5...|[3.99949769680097...|[0.19997488484004...|       1.0|
|    91.0|2015-01-03 00:36:00|2015-01-02 18:58:00|     OO|         2|        5|        2|    87.0| MSP|  1535.0|2015-01-01|     5520|   LAX|LAX-MSP|           3.0|[87.0,1535.0,3.0,...|[0.24864914309886...|[0.01243245715494...|       3.0|
|    -4.0|2015-01-03 10:30:00|2015-01-03 07:31:00|     B6|         3|        6|        3|    -1.0| PBI|  1035.0|2015-01-02|      461|   LGA|LGA-PBI|           1.0|[-1.0,1035.0,7.0,...|[5.22285454733473...|[0.26114272736673...|       1.0|
|    43.0|2015-01-03 17:34:00|2015-01-03 14:29:00|     UA|         3|        6|        3|     2.0| FLL|  1065.0|2015-01-02|     1164|   EWR|EWR-FLL|           3.0|[2.0,1065.0,5.0,1...|[5.05268842661314...|[0.25263442133065...|       1.0|
+--------+-------------------+-------------------+-------+----------+---------+---------+--------+----+--------+----------+---------+------+-------+--------------+--------------------+--------------------+--------------------+----------+
only showing top 6 rows


In [ ]: