Go here for the details on the Kaggle competition
Data profiling contained in a separate notebook ("SanFranCrime.ipynb")
In [1]:
sc
Out[1]:
In [7]:
sc.setLogLevel('INFO')
In [6]:
parqFileName = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/train.pqt'
sfc_train = sqlContext.read.parquet(parqFileName)
print sfc_train.count()
print sfc_train.printSchema()
# sfc_train = sfc_train.cache()
In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np
From the profiling results, the most frequent category of crime by far is "LARCENY/THEFT". We can set our baseline prediction to assume every crime is LARCENY/THEFT regardless of the actual category or any of the other attributes. Then, evaluate how accurate our baseline preditions are. Later, we will compare how much better/worse the machine learning methods are compared to this baseline.
For now, we're going to start with Precision-Recall for our evaluation framework. Later, we may consider additional evaluation metrics (e.g. AUC).
In [8]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="Category", outputCol="indexedLabel").fit(sfc_train)
sfc_train_t = labelIndexer.transform(sfc_train)
# sfc_train_t = sfc_train_t.cache()
# baseline_preds = sfc_train_t.selectExpr('indexedLabel as prediction', 'double(0) as label')
baseline_preds = sfc_train_t.selectExpr('indexedLabel as label', 'double(0) as prediction')
baseline_preds = baseline_preds.cache()
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
evaluator.evaluate(baseline_preds)
print 'Precision: {:08.6f}'.format(evaluator.evaluate(baseline_preds, {evaluator.metricName: 'precision'}))
print 'Recall: {:08.6f}'.format(evaluator.evaluate(baseline_preds, {evaluator.metricName: 'recall'}))
This is the multi-class version of the metric. Each observation is in one class and for each observation, you submit a predicted probability for each class. The metric is negative the log likelihood of the model that says each test observation is chosen independently from a distribution that places the submitted probability mass on the corresponding class, for each observation.
$$log loss = -\frac{1}{N}\sum_{i=1}^N\sum_{j=1}^My_{i,j}\log(p_{i,j})$$where N is the number of observations, M is the number of class labels, $log$ is the natural logarithm, $y_{i,j}$ is 1 if observation $i$ is in class $j$ and 0 otherwise, and $p_{i,j}$ is the predicted probability that observation $i$ is in class $j$.
In [ ]:
def computeLogLoss(obs, classes, preds):
sumM = 0.0
for n in 1 to numberOfObs: # dataframe agg function
for m in 1 to numberOfClassLabels: # map function
sumM += log(prob(n,m)) if actualLabel(n) == class(m) else 0.0
logLoss = -(sumM/numberOfObs)
In [9]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
cols = ['Descript','DayOfWeek','PdDistrict','Resolution','Address']
for col in cols:
stringIndexer = StringIndexer(inputCol=col, outputCol=col+'Index')
model = stringIndexer.fit(sfc_train)
sfc_train = model.transform(sfc_train)
encoder = OneHotEncoder(dropLast=False, inputCol=col+'Index', outputCol=col+'Vec')
sfc_train = encoder.transform(sfc_train)
print sfc_train.count()
print sfc_train.printSchema()
In [10]:
sfc_train.select('Address','AddressIndex','AddressVec').show(10,truncate=False)
Since we know from the data profiling results that the time span of the data is over 12 years, let's start with converting the Dates column to an ordinal (an integer value representing the number of days since year 1 day 1) and including with the VectorAssembler. After that we'll try transforming the datetime value to year, month, day, day of month, hour of day, season, etc. DayOfWeek is already provided separately in the dataset.
In [11]:
import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import *
udfDateToordinal = udf(lambda dt: dt.toordinal(), LongType())
sfc_train = sfc_train.withColumn('Dates_int',udfDateToordinal(sfc_train.Dates))
print sfc_train.select('Dates','Dates_int').show(3, truncate=False)
In [12]:
# Use the VectorAssembler to combine the converted Dates column with the ...
# Vectorized categorical column and also with the lat, long columns
vector_cols = ['Dates_int'] + [name for name,type in sfc_train.dtypes if 'Vec' in name ] + ['X','Y']
assembler = VectorAssembler(inputCols=vector_cols, outputCol="features")
sfc_train = assembler.transform(sfc_train)
sfc_train.select('Category','features').show(5,truncate=False)
In [15]:
# trim down to just the columns we need then cache the dataframe, this will help to
# keep the size of thw working dataset more manageable
sfc_train_trimmed = sfc_train.select('Category','features')
sfc_train_trimmed = sfc_train_trimmed.cache()
# write the trimmed DF out to disk, then read it back in
preppedFileName = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/prepped.pqt'
sfc_train_trimmed.write.parquet(preppedFileName, mode='overwrite')
In [20]:
# null out all our dataframes
# preppedFileName = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/prepped.pqt'
preppedFileName = 's3n://caserta-bucket1/lab/SanFranCrime/prepped.pqt/'
sfc_train = None
predictions = None
model = None
encoder = None
baseline_preds = None
sqlContext.clearCache()
prepped = sqlContext.read.parquet(preppedFileName)
print prepped.count()
print prepped.printSchema()
prepped = prepped.cache()
In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
if "indexedLabel" not in prepped.columns:
labelIndexer = StringIndexer(inputCol="Category", outputCol="indexedLabel").fit(prepped)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = prepped.randomSplit([0.7, 0.3])
# Train a GBT model.
rf = RandomForestClassifier(labelCol='indexedLabel', featuresCol='features',
# numTrees=30,
numTrees=30,
maxDepth=25,
featureSubsetStrategy='auto')
# Chain indexers and RF in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, rf])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions - returns a DataFrame
predictions = model.transform(testData)
print predictions.printSchema()
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
predictions = predictions.cache()
In [22]:
predictions = predictions.cache()
# predictions.select("prediction", "indexedLabel", "features").show(10)
predictions.select("prediction").groupBy('prediction').count().show()
In [23]:
eval_preds = predictions.select('prediction','indexedLabel')
eval_preds = eval_preds.cache()
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='indexedLabel')
evaluator.evaluate(eval_preds)
print 'Precision: {:08.6f}'.format(evaluator.evaluate(eval_preds, {evaluator.metricName: 'precision'}))
print 'Recall: {:08.6f}'.format(evaluator.evaluate(eval_preds, {evaluator.metricName: 'recall'}))
In [10]:
# null out all our dataframes
sfc_train = None
predictions = None
model = None
encoder = None
baseline_preds = None
sqlContext.clearCache()
In [7]:
sc.stop()
In [ ]: