In [ ]:
training = sqlContext.read.parquet("s3://zoltanctoth-flights/training.parquet")
test = sqlContext.read.parquet("s3://zoltanctoth-flights/training.parquet")
test.printSchema()
In [ ]:
test.first()
In [ ]:
training.cache()
test.cache()
Generate label column for the training data
In [ ]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
is_late = udf(lambda delay: 1.0 if delay > 0 else 0.0, DoubleType())
training = training.withColumn("is_late",is_late(training.arrdelay))
Create and fit Spark ML model
In [ ]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
# Create feature vectors. Ignore arr_delay and it's derivate, is_late
feature_assembler = VectorAssembler(
inputCols=[x for x in training.columns if x not in ["is_late","arrdelay"]],
outputCol="features")
reg = LogisticRegression().setParams(
maxIter = 100,
labelCol="is_late",
predictionCol="prediction")
model = Pipeline(stages=[feature_assembler, reg]).fit(training)
Predict whether the aircraft will be late
In [ ]:
predicted = model.transform(test)
In [ ]:
predicted.take(1)
Check model performance
In [ ]:
predicted = predicted.withColumn("is_late",is_late(predicted.arrdelay))
predicted.crosstab("is_late","prediction").show()
In [ ]: