San Francisco Crime Modeling

Go here for the details on the Kaggle competition

Predictive Goal: "Given time and location, you must predict the category of crime that occurred."

Data profiling contained in a separate notebook ("SanFranCrime.ipynb")


In [1]:
sc


Out[1]:
<pyspark.context.SparkContext at 0x7fed1c020390>

In [7]:
sc.setLogLevel('INFO')

Load the dataset from the prepared Parquet file


In [6]:
parqFileName = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/train.pqt'
sfc_train = sqlContext.read.parquet(parqFileName)
print sfc_train.count()
print sfc_train.printSchema()
# sfc_train = sfc_train.cache()


878049
root
 |-- Dates: timestamp (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: float (nullable = true)
 |-- Y: float (nullable = true)

None

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np

Step 1: Establish and evaluate a baseline

From the profiling results, the most frequent category of crime by far is "LARCENY/THEFT". We can set our baseline prediction to assume every crime is LARCENY/THEFT regardless of the actual category or any of the other attributes. Then, evaluate how accurate our baseline preditions are. Later, we will compare how much better/worse the machine learning methods are compared to this baseline.

For now, we're going to start with Precision-Recall for our evaluation framework. Later, we may consider additional evaluation metrics (e.g. AUC).


In [8]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="Category", outputCol="indexedLabel").fit(sfc_train)
sfc_train_t = labelIndexer.transform(sfc_train)
# sfc_train_t = sfc_train_t.cache()

# baseline_preds = sfc_train_t.selectExpr('indexedLabel as prediction', 'double(0) as label')
baseline_preds = sfc_train_t.selectExpr('indexedLabel as label', 'double(0) as prediction')
baseline_preds = baseline_preds.cache()

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction') 
evaluator.evaluate(baseline_preds) 
print 'Precision: {:08.6f}'.format(evaluator.evaluate(baseline_preds, {evaluator.metricName: 'precision'}))
print 'Recall:  {:08.6f}'.format(evaluator.evaluate(baseline_preds, {evaluator.metricName: 'recall'}))


Precision: 0.199192
Recall:  0.199192

Thus, our machine learning results must be better than guessing that every category is LARCENY/THEFT.

Todo: program the LogLoss evaluation metric

This is the multi-class version of the metric. Each observation is in one class and for each observation, you submit a predicted probability for each class. The metric is negative the log likelihood of the model that says each test observation is chosen independently from a distribution that places the submitted probability mass on the corresponding class, for each observation.

$$log loss = -\frac{1}{N}\sum_{i=1}^N\sum_{j=1}^My_{i,j}\log(p_{i,j})$$

where N is the number of observations, M is the number of class labels, $log$ is the natural logarithm, $y_{i,j}$ is 1 if observation $i$ is in class $j$ and 0 otherwise, and $p_{i,j}$ is the predicted probability that observation $i$ is in class $j$.


In [ ]:
def computeLogLoss(obs, classes, preds):
    
    sumM = 0.0
    
    for n in 1 to numberOfObs:   #  dataframe agg function
        for m in 1 to numberOfClassLabels:   # map function
            sumM += log(prob(n,m)) if actualLabel(n) == class(m) else 0.0
    
    logLoss = -(sumM/numberOfObs)

Step 2: Prepare the features

Encoding the categorical features ...


In [9]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

cols = ['Descript','DayOfWeek','PdDistrict','Resolution','Address']

for col in cols:
    stringIndexer = StringIndexer(inputCol=col, outputCol=col+'Index')
    model = stringIndexer.fit(sfc_train)
    sfc_train = model.transform(sfc_train)
    encoder = OneHotEncoder(dropLast=False, inputCol=col+'Index', outputCol=col+'Vec')
    sfc_train = encoder.transform(sfc_train)

print sfc_train.count()
print sfc_train.printSchema()


878049
root
 |-- Dates: timestamp (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: float (nullable = true)
 |-- Y: float (nullable = true)
 |-- DescriptIndex: double (nullable = true)
 |-- DescriptVec: vector (nullable = true)
 |-- DayOfWeekIndex: double (nullable = true)
 |-- DayOfWeekVec: vector (nullable = true)
 |-- PdDistrictIndex: double (nullable = true)
 |-- PdDistrictVec: vector (nullable = true)
 |-- ResolutionIndex: double (nullable = true)
 |-- ResolutionVec: vector (nullable = true)
 |-- AddressIndex: double (nullable = true)
 |-- AddressVec: vector (nullable = true)

None

In [10]:
sfc_train.select('Address','AddressIndex','AddressVec').show(10,truncate=False)


+-----------------------------+------------+--------------------+
|Address                      |AddressIndex|AddressVec          |
+-----------------------------+------------+--------------------+
|VANNESS AV / GREENWICH ST    |7781.0      |(23228,[7781],[1.0])|
|JEFFERSON ST / LEAVENWORTH ST|6049.0      |(23228,[6049],[1.0])|
|MENDELL ST / HUDSON AV       |5846.0      |(23228,[5846],[1.0])|
|2000 Block of BUSH ST        |3197.0      |(23228,[3197],[1.0])|
|1600 Block of WEBSTER ST     |3081.0      |(23228,[3081],[1.0])|
|0 Block of STOCKTON ST       |72.0        |(23228,[72],[1.0])  |
|23RD ST / WISCONSIN ST       |4847.0      |(23228,[4847],[1.0])|
|GEARY BL / LAGUNA ST         |246.0       |(23228,[246],[1.0]) |
|400 Block of HYDE ST         |417.0       |(23228,[417],[1.0]) |
|STOCKTON ST / SUTTER ST      |441.0       |(23228,[441],[1.0]) |
+-----------------------------+------------+--------------------+
only showing top 10 rows

What do we do about the "Dates" datetime column ? ...

Since we know from the data profiling results that the time span of the data is over 12 years, let's start with converting the Dates column to an ordinal (an integer value representing the number of days since year 1 day 1) and including with the VectorAssembler. After that we'll try transforming the datetime value to year, month, day, day of month, hour of day, season, etc. DayOfWeek is already provided separately in the dataset.


In [11]:
import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import *

udfDateToordinal = udf(lambda dt: dt.toordinal(), LongType())
sfc_train = sfc_train.withColumn('Dates_int',udfDateToordinal(sfc_train.Dates))
print sfc_train.select('Dates','Dates_int').show(3, truncate=False)


+---------------------+---------+
|Dates                |Dates_int|
+---------------------+---------+
|2015-05-13 23:33:00.0|735731   |
|2015-05-13 22:58:00.0|735731   |
|2015-05-13 21:40:00.0|735731   |
+---------------------+---------+
only showing top 3 rows

None

Assembling the feature vector ...


In [12]:
# Use the VectorAssembler to combine the converted Dates column with the ...
# Vectorized categorical column and also with the lat, long columns
vector_cols = ['Dates_int'] + [name for name,type in sfc_train.dtypes if 'Vec' in name ] + ['X','Y']
assembler = VectorAssembler(inputCols=vector_cols, outputCol="features")
sfc_train = assembler.transform(sfc_train)
sfc_train.select('Category','features').show(5,truncate=False)


+--------------+-----------------------------------------------------------------------------------------------------------------+
|Category      |features                                                                                                         |
+--------------+-----------------------------------------------------------------------------------------------------------------+
|OTHER OFFENSES|(24144,[0,45,881,889,898,8695,24142,24143],[735731.0,1.0,1.0,1.0,1.0,1.0,-122.42436218261719,37.8004150390625])  |
|LARCENY/THEFT |(24144,[0,9,881,891,897,6963,24142,24143],[735731.0,1.0,1.0,1.0,1.0,1.0,-122.4190902709961,37.80780029296875])   |
|OTHER OFFENSES|(24144,[0,11,881,890,898,6760,24142,24143],[735731.0,1.0,1.0,1.0,1.0,1.0,-122.38639831542969,37.738983154296875])|
|LARCENY/THEFT |(24144,[0,1,881,889,897,4111,24142,24143],[735731.0,1.0,1.0,1.0,1.0,1.0,-122.43101501464844,37.78738784790039])  |
|VANDALISM     |(24144,[0,13,881,889,897,3995,24142,24143],[735731.0,1.0,1.0,1.0,1.0,1.0,-122.43131256103516,37.78586959838867]) |
+--------------+-----------------------------------------------------------------------------------------------------------------+
only showing top 5 rows


In [15]:
# trim down to just the columns we need then cache the dataframe, this will help to
# keep the size of thw working dataset more manageable
sfc_train_trimmed = sfc_train.select('Category','features')
sfc_train_trimmed = sfc_train_trimmed.cache()

# write the trimmed DF out to disk, then read it back in
preppedFileName = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/prepped.pqt'
sfc_train_trimmed.write.parquet(preppedFileName, mode='overwrite')

In [20]:
# null out all our dataframes
# preppedFileName = '/Users/bill.walrond/Documents/dsprj/data/SanFranCrime/prepped.pqt'
preppedFileName = 's3n://caserta-bucket1/lab/SanFranCrime/prepped.pqt/'
sfc_train = None
predictions = None
model = None
encoder = None
baseline_preds = None
sqlContext.clearCache()

prepped = sqlContext.read.parquet(preppedFileName)
print prepped.count()
print prepped.printSchema()
prepped = prepped.cache()


878049
root
 |-- Category: string (nullable = true)
 |-- features: vector (nullable = true)

None

Step 3: Create train and tune sets and fit a model

ToDo: revise the splitting approach to be temporally aware


In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
if "indexedLabel" not in prepped.columns:
    labelIndexer = StringIndexer(inputCol="Category", outputCol="indexedLabel").fit(prepped)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = prepped.randomSplit([0.7, 0.3])

# Train a GBT model.
rf = RandomForestClassifier(labelCol='indexedLabel', featuresCol='features', 
                            # numTrees=30, 
                            numTrees=30, 
                            maxDepth=25,
                            featureSubsetStrategy='auto')

# Chain indexers and RF in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, rf])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions - returns a DataFrame
predictions = model.transform(testData)
print predictions.printSchema()

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
predictions = predictions.cache()


root
 |-- Category: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- indexedLabel: double (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)

None
+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|        27.0|(24144,[0,179,880...|
|       0.0|        27.0|(24144,[0,179,880...|
|       0.0|        27.0|(24144,[0,179,880...|
|       0.0|        27.0|(24144,[0,179,880...|
|       0.0|        27.0|(24144,[0,179,880...|
+----------+------------+--------------------+
only showing top 5 rows


In [22]:
predictions = predictions.cache()
# predictions.select("prediction", "indexedLabel", "features").show(10)
predictions.select("prediction").groupBy('prediction').count().show()


+----------+------+
|prediction| count|
+----------+------+
|       7.0| 11440|
|      24.0|   437|
|       1.0| 40479|
|       9.0|  1614|
|      12.0|   972|
|      15.0|   263|
|       6.0|  6373|
|      20.0|    21|
|       3.0| 15129|
|      10.0|  4629|
|      13.0|    39|
|       5.0|  9493|
|      16.0|   270|
|      19.0|     1|
|       8.0|   523|
|      25.0|   490|
|      28.0|    24|
|      11.0|  1889|
|       0.0|137460|
|       4.0|  9964|
+----------+------+
only showing top 20 rows


In [23]:
eval_preds = predictions.select('prediction','indexedLabel')
eval_preds = eval_preds.cache()

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='indexedLabel') 
evaluator.evaluate(eval_preds) 
print 'Precision: {:08.6f}'.format(evaluator.evaluate(eval_preds, {evaluator.metricName: 'precision'}))
print 'Recall:  {:08.6f}'.format(evaluator.evaluate(eval_preds, {evaluator.metricName: 'recall'}))


Precision: 0.649101
Recall:  0.649101

In [10]:
# null out all our dataframes
sfc_train = None
predictions = None
model = None
encoder = None
baseline_preds = None
sqlContext.clearCache()

In [7]:
sc.stop()

In [ ]: