In [1]:
# Initialize PySpark
APP_NAME = "Debugging Prediction Problems"
# If there is no SparkSession, create the environment
try:
sc and spark
except NameError as e:
import findspark
findspark.init()
import pyspark
import pyspark.sql
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()
print("PySpark initiated...")
In [2]:
# Load the text file using the SparkContext
csv_lines = sc.textFile("../data/example.csv")
# Map the data to split the lines into a list
data = csv_lines.map(lambda line: line.split(","))
# Collect the dataset into local RAM
data.collect()
Out[2]:
In [3]:
# Turn the CSV lines into objects
def csv_to_record(line):
parts = line.split(",")
record = {
"name": parts[0],
"company": parts[1],
"title": parts[2]
}
return record
# Apply the function to every record
records = csv_lines.map(csv_to_record)
# Inspect the first item in the dataset
records.first()
Out[3]:
In [5]:
# Group the records by the name of the person
grouped_records = records.groupBy(lambda x: x["name"])
# Show the first group
print(grouped_records.first())
# Count the groups
job_counts = grouped_records.map(
lambda x: {
"name": x[0],
"job_count": len(x[1])
}
)
job_counts.collect()
Out[5]:
In [6]:
# Compute a relation of words by line
words_by_line = csv_lines\
.map(lambda line: line.split(","))
words_by_line.collect()
Out[6]:
In [7]:
# Compute a relation of words
flattened_words = csv_lines\
.map(lambda line: line.split(","))\
.flatMap(lambda x: x)
flattened_words.collect()
Out[7]:
In [8]:
lengths = flattened_words.map(lambda x: len(x))
lengths.collect()
Out[8]:
In [9]:
lengths.sum() / lengths.count()
Out[9]:
In [11]:
from pyspark.sql import Row
# Convert the CSV into a pyspark.sql.Row
def csv_to_row(line):
parts = line.split(",")
row = Row(
name=parts[0],
company=parts[1],
title=parts[2]
)
return row
# Apply the function to get rows in an RDD
rows = csv_lines.map(csv_to_row)
rows.first()
Out[11]:
In [14]:
records = csv_lines.map(lambda line: line.split(','))
records.collect()
groups = records.groupBy(lambda x: x[0])
counts = groups.map(lambda x: (x[0], len(x[1])))
new_rows = counts.map(lambda x: Row(name=x[0], total=x[1]))
new_rows.collect()
Out[14]:
In [15]:
new_rows.toDF().select("name","total").show()
In [16]:
# Convert to a pyspark.sql.DataFrame
rows_df = rows.toDF()
rows_df.show()
# Register the DataFrame for Spark SQL
rows_df.registerTempTable("executives")
# Generate a new DataFrame with SQL using the SparkSession
job_counts = spark.sql("""
SELECT
name,
COUNT(*) AS total
FROM executives
GROUP BY name
""")
job_counts.show()
# Go back to an RDD
job_counts.rdd.map(lambda x: x.asDict()).collect()
Out[16]:
In [17]:
my_rdd = sc.parallelize([1,2,3,4,5])
my_rdd.first()
Out[17]:
dict
elements with named fields using sc.parallelize. Make it at least 5 records long.dicts
into an RDD of pyspark.sql.Row
elements.pyspark.sql.Rows
into a pyspark.sql.DataFrame
.GROUP BY
/COUNT
on your new DataFrame
.
In [18]:
my_data = [
{"name": "Russell Jurney", "interest": "Ancient Greece"},
{"name": "Chris Jurney", "interest": "Virtual Reality"},
{"name": "Bill Jurney", "interest": "Sports"},
{"name": "Ruth Jurney", "interest": "Wildlife"},
{"name": "Bob Smith", "interest": "Sports"}
]
my_rdd = sc.parallelize(my_data)
my_rows = my_rdd.map(lambda x: Row(name=x["name"], interest=x["interest"]))
my_df = my_rows.toDF()
my_df.show()
my_df.registerTempTable("people")
spark.sql("""SELECT interest, COUNT(*) as total FROM people GROUP BY interest""").show()
In [19]:
job_counts.rdd.map(lambda x: x.asDict()).collect()
Out[19]:
In [20]:
my_df.rdd.map(lambda x: x.asDict()).collect()
Out[20]:
In [22]:
# Load the parquet file containing flight delay records
on_time_dataframe = spark.read.parquet('../data/on_time_performance.parquet')
# Register the data for Spark SQL
on_time_dataframe.registerTempTable("on_time_performance")
# Check out the columns
on_time_dataframe.columns
Out[22]:
In [23]:
# Trim the fields and keep the result
trimmed_on_time = on_time_dataframe\
.select(
"FlightDate",
"TailNum",
"Origin",
"Dest",
"Carrier",
"DepDelay",
"ArrDelay"
)
# Sample 0.01% of the data and show
trimmed_on_time.sample(False, 0.0001).show(10)
sampled_ten_percent = trimmed_on_time.sample(False, 0.1)
sampled_ten_percent.show(10)
DataFrame
Workflow: Calculating Speed in Dataflow and SQLWe can go back and forth between dataflow programming and SQL programming using pyspark.sql.DataFrames
. This enables us to get the best of both worlds from these two APIs. For example, if we want to group records and get a total count for each group... a SQL SELECT/GROUP BY/COUNT
is the most direct way to do it. On the other hand, if we want to filter data, a dataflow API call like DataFrame.filter()
is the cleanest way. This comes down to personal preference for the user. In time you will develop your own style of working.
If we were to look at the AirTime
along with the Distance
, we could get a good idea of how fast the airplanes were going. Pretty cool! Lets do this using Dataflows first.
First lets select just the two columns of interest: AirTime
and Distance
. We can always go back and select more columns if we want to extend our analysis, but trimming uneeded fields optimizes performance right away.
In [24]:
fd = on_time_dataframe.select("AirTime", "Distance")
fd.show(6)
In [25]:
hourly_fd = fd.select((fd.AirTime / 60).alias('Hours'), "Distance")
hourly_fd.show(5)
In [26]:
miles_per_hour = hourly_fd.select(
(hourly_fd.Distance / hourly_fd.Hours).alias('Mph')
)
miles_per_hour.show(10)
In [27]:
fd.select(
"AirTime",
(fd.AirTime / 60).alias('Hours'),
"Distance"
).show()
In [28]:
fd = on_time_dataframe.select("AirTime", "Distance")
filled_fd = fd.filter(fd.AirTime.isNotNull())
hourly_fd = filled_fd.select(
"AirTime",
(filled_fd.AirTime / 60).alias('Hours'),
"Distance"
)
mph = hourly_fd.select((hourly_fd.Distance / hourly_fd.Hours).alias('Mph'))
mph.show(10)
In [29]:
from pyspark.sql.functions import avg
mph.select(
pyspark.sql.functions.avg(mph.Mph)
).show()
It looks like the average speed of the fleet is 408 mph. Note how along the way we chekced the data for sanity, which led to confidence in our answer. SQL by contast can hide the internals of a query, which might have skewed our average significantly!
Now lets work the same thing out in SQL. Starting from the top:
In [30]:
on_time_dataframe.registerTempTable("on_time_performance")
mph = spark.sql("""
SELECT ( Distance / ( AirTime/60 ) ) AS Mph
FROM on_time_performance
WHERE AirTime IS NOT NULL
ORDER BY AirTime
""")
mph.show(10)
mph.registerTempTable("mph")
spark.sql("SELECT AVG(Mph) from mph").show()
In [31]:
# Compute a histogram of departure delays
mph\
.select("Mph")\
.rdd\
.flatMap(lambda x: x)\
.histogram(10)
Out[31]:
The problem with the output above is that it is hard to interpret. For better understanding, we need a visualization. We can use matplotlib
inline in a Jupyter Notebook to visualize this distribution and see what the tendency of speed of airplanes is around the mean of 408 mph.
In [32]:
%matplotlib inline
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
# Function to plot a histogram using pyplot
def create_hist(rdd_histogram_data):
"""Given an RDD.histogram, plot a pyplot histogram"""
heights = np.array(rdd_histogram_data[1])
full_bins = rdd_histogram_data[0]
mid_point_bins = full_bins[:-1]
widths = [abs(i - j) for i, j in zip(full_bins[:-1], full_bins[1:])]
bar = plt.bar(mid_point_bins, heights, width=widths, color='b')
return bar
# Compute a histogram of departure delays
departure_delay_histogram = mph\
.select("Mph")\
.rdd\
.flatMap(lambda x: x)\
.histogram(10)
create_hist(departure_delay_histogram)
Out[32]:
In [33]:
# Compute a histogram of departure delays
departure_delay_histogram = mph\
.select("Mph")\
.rdd\
.flatMap(lambda x: x)\
.histogram(20)
create_hist(departure_delay_histogram)
Out[33]:
In [34]:
# Dump the unneeded fields
tail_numbers = on_time_dataframe.rdd.map(lambda x: x.TailNum)
tail_numbers = tail_numbers.filter(lambda x: x != '' and x is not None)
# distinct() gets us unique tail numbers
unique_tail_numbers = tail_numbers.distinct()
# now we need a count() of unique tail numbers
airplane_count = unique_tail_numbers.count()
print("Total airplanes: {}".format(airplane_count))
Using the techniques we demonstated above, calculate any 3 out of 4 of the following things using both the SQL and the Dataflow methods for each one. That is: implement each calculation twice - once in SQL and once using Dataflows. Try to use both the RDD
and DataFrame
APIs as you work.
create_hist
method shown above.create_hist
method shown above.
In [35]:
origin_hour_dist = on_time_dataframe.filter(
on_time_dataframe.AirTime.isNotNull()
).select(
"Origin",
(on_time_dataframe.AirTime/60).alias("Hours"),
"Distance"
)
mph_origins = origin_hour_dist.select(
"Origin",
(origin_hour_dist.Distance / origin_hour_dist.Hours).alias("Mph")
)
mph_origins.registerTempTable("mph_origins")
avg_speeds = mph_origins.groupBy("Origin").agg({"Mph": "avg"}).alias("Mph")
avg_speeds.show()
In [ ]:
on_time_dataframe.columns
DataFrame.groupBy
We can use Spark SQL to calculate things using DataFrames
, but we can also group data and calculate as we did with RDDs
. For a full list of methods you can apply to grouped DataFrames
, see the documentation for pyspark.sql.GroupedData. Below we will demonstrate some of these methods.
In [36]:
# Calculate average of every numeric field
on_time_dataframe.groupBy("Origin").avg().show(1)
# Calculate verage AirTime per origin city
on_time_dataframe.groupBy("Origin").agg({"AirTime": "mean"}).show(1)
# Get the count of flights from each origin
on_time_dataframe.groupBy("Origin").count().show(1)
# Get the maximum airtime for flights leaving each city
on_time_dataframe.groupby("Origin").agg({"AirTime": "max"}).show(1)
# Get the maximum of all numeric columns for flights leaving each city
on_time_dataframe.groupBy("Origin").max().show(1)
# Get the shortest flight for each origin airport
on_time_dataframe.groupBy("Origin").agg({"AirTime": "min"}).show(1)
# Total minutes flown from each airport
on_time_dataframe.groupBy("Origin").agg({"AirTime": "sum"}).show(1)
In [37]:
on_time_dataframe\
.filter("Origin == 'ATL'")\
.groupBy("Origin")\
.pivot("Dest")\
.avg("AirTime")\
.rdd\
.map(lambda x: x.asDict())\
.collect()[0]
Out[37]:
Another type of visualization that is of interest to data scientists is the scatterplot. A scatterplot enables us to compare the trend of one value plotted against the other. For example, we could calculate the relationship between Origin
and Dest
Distance
and the Mph
speed figure we calculated earlier. Are longer flights generally faster, or not?
To prepare a scatterplot, we need to use matplotlib
again, so we'll need to look at what its scatterplot API expects. The matplotlib.pyplot.scatter
API takes two independant lists of values for the variables x and y, so we must compute them for Distance
and Mph
.
In [38]:
mph = spark.sql("""
SELECT
Distance,
( Distance / ( AirTime/60 ) ) AS Mph
FROM on_time_performance
WHERE AirTime IS NOT NULL
""")
mph.show(10)
In [39]:
distance = mph.select("Distance").rdd.flatMap(lambda x: x)
distance = distance.collect()
distance[0:10]
Out[39]:
In [40]:
speed = mph.select("Mph").rdd.flatMap(lambda x: x)
speed = speed.collect()
speed[0:10]
Out[40]:
In [41]:
print("Total distances: {:,}".format(len(distance)))
print("Total speeds: {:,}".format(len(speed)))
It is hard to plot 5.7 million dots on a scatterplot that will fit on a computer screen. So lets sample our data. We can use PySpark DataFrame's
sample method. Lets take a 0.1% random sample without replacement, which will leave us with 5,687 or so data points - something we can more easily manage.
In [42]:
sample = mph.sample(False, 0.001)
sample.count()
Out[42]:
Note that we need to sample once and then split the datasets out - otherwise the data for a single observation will be scrambled across variables. We don't want that! All our scatterplots would show no relationships at all.
In [43]:
speed = sample.select("Mph").rdd.flatMap(lambda x: x).collect()
distance = sample.select("Distance").rdd.flatMap(lambda x: x).collect()
print("{:,} x {:,} records!".format(
len(speed),
len(distance)
))
In [44]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (18,12)
plt.scatter(
distance,
speed,
alpha=0.5
)
plt.title("Distance x Speed")
plt.xlabel("Distance")
plt.ylabel("Speed")
plt.show()
We can see pretty clearly that as distance increases, average speed across that distance increases rapidly and then levels off as the distance increases.
on_time_dataframe
to focus on two numeric fields. It is often the case that once we characterize a distribution, we want to create a function to predict one variable given the other. Lets take this example further by fitting a polynomial regression to describe our data. We use sklearn.pipeline.Pipeline
to chain a sklearn.preprocessing.PolynomialFeatures
to a sklearn.linear_model.LinearRegression
. Other than that, we simply define x and y, and fit a model to those values. Then we finally compute a cross value score, to see the model's performance. We'll see this pattern again when we use large data tools in Spark MLlib.
In [45]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
import numpy as np
x = np.array(distance)
y = np.array(speed)
x_test = np.arange(0, 5000, 100)
model = Pipeline([
('poly', PolynomialFeatures(degree=3)),
('linear', LinearRegression(fit_intercept=False))
])
model = model.fit(x[:, np.newaxis], y)
model.named_steps['linear'].coef_
y_out = model.predict(x_test.reshape(-1,1))
cross_val_score(model, x.reshape(-1,1), y)
Out[45]:
In [46]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (18,12)
plt.scatter(
distance,
speed,
alpha=0.5
)
plt.plot(
x_test,
y_out,
color='orange',
linewidth=3
)
plt.title("Distance x Speed")
plt.xlabel("Distance")
plt.ylabel("Speed")
plt.show()
Next we're going to learn how to join between datasets using PySpark. We're going to pick up from an example we're going to work in chapter 6, and explore it more deeply. To begin with, we will prepare a list of TailNum
(tail numbers) from the FAA flight records. These uniquely identify each airplane from each flight.
In [47]:
tail_numbers = on_time_dataframe.select("TailNum").distinct()
tail_numbers.show(6)
In [48]:
faa_tail_number_inquiry = spark.read.json('../data/faa_tail_number_inquiry.jsonl')
airplane_records = faa_tail_number_inquiry.select(
faa_tail_number_inquiry.TailNum.alias("FAATailNum"),
"Model",
"Engine_Model"
)
airplane_records.show(6)
In [49]:
# INNER JOIN
print(
"FAA tail numbers: {:,}".format(
tail_numbers.count()
)
)
print(
"Airplane records: {:,}".format(
airplane_records.count()
)
)
inner_joined = tail_numbers.join(
airplane_records,
tail_numbers.TailNum == airplane_records.FAATailNum,
'inner'
)
print(
"Joined records: {:,}".format(
inner_joined.count()
)
)
In [50]:
inner_joined.show(6)
Note how convenient it is that we renamed one of the keys FAATailNum
. If we hadn't, we'd have two columns with the same name now and would have trouble referring to one or the other.
Another type of join is the left outer join. It ensures that one record will remain in the output from the left side of the join no matter what. If a match on the join keys is found, the fields for the record on the right will be filled. If a match is not found, they will be empty.
Lets look at how this works with our two datasets.
In [51]:
# INNER JOIN
print(
"FAA tail numbers: {:,}".format(
tail_numbers.count()
)
)
print(
"Airplane records: {:,}".format(
airplane_records.count()
)
)
left_outer_joined = tail_numbers.join(
airplane_records,
tail_numbers.TailNum == airplane_records.FAATailNum,
'left_outer'
)
print(
"Joined records: {:,}".format(
left_outer_joined.count()
)
)
In [52]:
left_outer_joined.show(6)
Note that some records have fields filled out, and some don't.
Another type of join is the right outer join. This works the opposite of a left outer join. In this case, the output will preserve a record for each and every record on the right side of the join. Use the right_outer
join type to perform this kind of join.
distinct()
call on the FAA on-time performance records still needed? Why or why not?
In [53]:
# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(
lambda x: (x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
)
# Group flights by tail number, sorted by date, then flight number, then origin/dest
flights_per_airplane = flights\
.map(lambda nameTuple: (nameTuple[5], [nameTuple[0:5]]))\
.reduceByKey(lambda a, b: a + b)\
.map(lambda tuple:
{
'TailNum': tuple[0],
'Flights': sorted(tuple[1], key=lambda x: (x[1], x[2], x[3], x[4]))
}
)
flights_per_airplane.first()
Out[53]:
In [56]:
total_flights = on_time_dataframe.count()
# Flights that were late leaving...
late_departures = on_time_dataframe.filter(
on_time_dataframe.DepDelayMinutes > 0
)
total_late_departures = late_departures.count()
print(f'{total_late_departures:,}')
# Flights that were late arriving...
late_arrivals = on_time_dataframe.filter(
on_time_dataframe.ArrDelayMinutes > 0
)
total_late_arrivals = late_arrivals.count()
print(f'{total_late_arrivals:,}')
# Get the percentage of flights that are late, rounded to 1 decimal place
pct_late = round((total_late_arrivals / (total_flights * 1.0)) * 100, 1)
pct_late
Out[56]:
In [60]:
# Flights that left late but made up time to arrive on time...
on_time_heros = on_time_dataframe.filter(
(on_time_dataframe.DepDelayMinutes > 0)
&
(on_time_dataframe.ArrDelayMinutes <= 0)
)
total_on_time_heros = on_time_heros.count()
print(f'{total_on_time_heros:,}')
In [61]:
print("Total flights: {:,}".format(total_flights))
print("Late departures: {:,}".format(total_late_departures))
print("Late arrivals: {:,}".format(total_late_arrivals))
print("Recoveries: {:,}".format(total_on_time_heros))
print("Percentage Late: {}%".format(pct_late))
In [62]:
# Get the average minutes late departing and arriving
spark.sql("""
SELECT
ROUND(AVG(DepDelay),1) AS AvgDepDelay,
ROUND(AVG(ArrDelay),1) AS AvgArrDelay
FROM on_time_performance
"""
).show()
In [63]:
# Why are flights late? Lets look at some delayed flights and the delay causes
late_flights = spark.sql("""
SELECT
ArrDelayMinutes,
WeatherDelay,
CarrierDelay,
NASDelay,
SecurityDelay,
LateAircraftDelay
FROM
on_time_performance
WHERE
WeatherDelay IS NOT NULL
OR
CarrierDelay IS NOT NULL
OR
NASDelay IS NOT NULL
OR
SecurityDelay IS NOT NULL
OR
LateAircraftDelay IS NOT NULL
ORDER BY
FlightDate
""")
late_flights.sample(False, 0.01).show()
In [64]:
# Calculate the percentage contribution to delay for each source
total_delays = spark.sql("""
SELECT
ROUND(SUM(WeatherDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_weather_delay,
ROUND(SUM(CarrierDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_carrier_delay,
ROUND(SUM(NASDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_nas_delay,
ROUND(SUM(SecurityDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_security_delay,
ROUND(SUM(LateAircraftDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_late_aircraft_delay
FROM on_time_performance
""")
total_delays.show()
In [67]:
# Eyeball the first to define our buckets
weather_delay_histogram = on_time_dataframe\
.select("WeatherDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram([1, 5, 10, 15, 30, 60, 120, 240, 480, 720, 24*60.0])
print(weather_delay_histogram)
In [69]:
# See above for definition
create_hist(weather_delay_histogram)
Out[69]:
In [73]:
# Transform the data into something easily consumed by d3
def histogram_to_publishable(histogram):
record = {'key': 1, 'data': []}
for label, value in zip(histogram[0], histogram[1]):
record['data'].append(
{
'label': label,
'value': value
}
)
return record
# Recompute the weather histogram with a filter for on-time flights
weather_delay_histogram = on_time_dataframe\
.filter(
(on_time_dataframe.WeatherDelay.isNotNull())
&
(on_time_dataframe.WeatherDelay > 0)
)\
.select("WeatherDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram([0, 15, 30, 60, 120, 240, 480, 720, 24*60.0])
print(weather_delay_histogram)
record = histogram_to_publishable(weather_delay_histogram)
record
Out[73]:
In [74]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf
schema = StructType([
StructField("ArrDelay", DoubleType(), True), # "ArrDelay":5.0
StructField("CRSArrTime", TimestampType(), True), # "CRSArrTime":"2015-12-31T03:20:00.000-08:00"
StructField("CRSDepTime", TimestampType(), True), # "CRSDepTime":"2015-12-31T03:05:00.000-08:00"
StructField("Carrier", StringType(), True), # "Carrier":"WN"
StructField("DayOfMonth", IntegerType(), True), # "DayOfMonth":31
StructField("DayOfWeek", IntegerType(), True), # "DayOfWeek":4
StructField("DayOfYear", IntegerType(), True), # "DayOfYear":365
StructField("DepDelay", DoubleType(), True), # "DepDelay":14.0
StructField("Dest", StringType(), True), # "Dest":"SAN"
StructField("Distance", DoubleType(), True), # "Distance":368.0
StructField("FlightDate", DateType(), True), # "FlightDate":"2015-12-30T16:00:00.000-08:00"
StructField("FlightNum", StringType(), True), # "FlightNum":"6109"
StructField("Origin", StringType(), True), # "Origin":"TUS"
])
features = spark.read.json(
"../data/simple_flight_delay_features.jsonl.bz2",
schema=schema
)
features.first()
Out[74]:
In [75]:
#
# Check for nulls in features before using Spark ML
#
null_counts = [(column, features.where(features[column].isNull()).count()) for column in features.columns]
cols_with_nulls = filter(lambda x: x[1] > 0, null_counts)
print(list(cols_with_nulls))
In [76]:
#
# Add a Route variable to replace FlightNum
#
from pyspark.sql.functions import lit, concat
features_with_route = features.withColumn(
'Route',
concat(
features.Origin,
lit('-'),
features.Dest
)
)
features_with_route.select("Origin", "Dest", "Route").show(5)
In [77]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay
#
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
bucketizer = Bucketizer(
splits=splits,
inputCol="ArrDelay",
outputCol="ArrDelayBucket"
)
ml_bucketized_features = bucketizer.transform(features_with_route)
# Check the buckets out
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show()
In [78]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler
# Turn category fields into categoric feature vectors, then drop intermediate fields
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
"Origin", "Dest", "Route"]:
string_indexer = StringIndexer(
inputCol=column,
outputCol=column + "_index"
)
ml_bucketized_features = string_indexer.fit(ml_bucketized_features)\
.transform(ml_bucketized_features)
# Check out the indexes
ml_bucketized_features.show(6)
In [79]:
# Handle continuous, numeric fields by combining them into one feature vector
numeric_columns = ["DepDelay", "Distance"]
index_columns = ["Carrier_index", "DayOfMonth_index",
"DayOfWeek_index", "DayOfYear_index", "Origin_index",
"Origin_index", "Dest_index", "Route_index"]
vector_assembler = VectorAssembler(
inputCols=numeric_columns + index_columns,
outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)
# Drop the index columns
for column in index_columns:
final_vectorized_features = final_vectorized_features.drop(column)
# Check out the features
final_vectorized_features.show()
In [80]:
#
# Cross validate, train and evaluate classifier
#
# Test/train split
training_data, test_data = final_vectorized_features.randomSplit([0.7, 0.3])
# Instantiate and fit random forest classifier
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(
featuresCol="Features_vec",
labelCol="ArrDelayBucket",
maxBins=4657,
maxMemoryInMB=1024
)
model = rfc.fit(training_data)
# Evaluate model using test data
predictions = model.transform(test_data)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="ArrDelayBucket", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = {}".format(accuracy))
# Check a sample
predictions.sample(False, 0.001, 18).orderBy("CRSDepTime").show(6)
In [ ]: