The City of San Francisco publishes historical crime events on their http://sfdata.gov website.
I have loaded this dataset into HIVE. Let's use Spark to do some fun stuff with it!
First we create a SparkContext:
In [1]:
# Set up Spark Context
from pyspark import SparkContext, SparkConf
SparkContext.setSystemProperty('spark.executor.memory', '4g')
conf = SparkConf()
conf.set('spark.sql.autoBroadcastJoinThreshold', 200*1024*1024) # 200MB for map-side joins
conf.set('spark.executor.instances', 12)
sc = SparkContext('yarn-client', 'Spark-demo', conf=conf)
And now we create a HiveContext to enable Spark to access data from HIVE:
In [2]:
# Setup HiveContext
from pyspark.sql import HiveContext, Row
hc = HiveContext(sc)
hc.sql("use demo")
hc.sql("DESCRIBE crimes").show()
Let's take a look at the dataset - first 5 rows:
In [3]:
crimes = hc.table("crimes")
crimes.limit(5).toPandas()
Out[3]:
What are the different types of crime resolutions?
In [4]:
crimes.select("resolution").distinct().toPandas()
Out[4]:
Let's define a crime as 'resolved' if it has any string except "NONE" in the resolution column.
Question: How many crimes total in the dataset? How many resolved?
In [5]:
total = crimes.count()
num_resolved = crimes.filter(crimes.resolution != 'NONE').count()
print str(total) + " crimes total, out of which " + str(num_resolved) + " were resolved"
Let's look at the longitude/latitude values in more detail. Spark provides the describe() function to see this some basic statistics of these columns:
In [6]:
c1 = crimes.select(crimes.longitude.cast("float").alias("long"),
crimes.latitude.cast("float").alias("lat"))
c1.describe().toPandas()
Out[6]:
Notice that the max values for longitude (-120.5) and latitude (90.0) seem strange. Those are not inside the SF area. Let's see how many bad values like this exist in the dataset:
In [7]:
c2 = c1.filter('lat < 37 or lat > 38')
print c2.count()
c2.head(3)
Out[7]:
Seems like this is a data quality issue where some data points just have a fixed (bad) value of -120.5, 90.
In [8]:
hc.sql("add jar /home/jupyter/notebooks/jars/guava-11.0.2.jar")
hc.sql("add jar /home/jupyter/notebooks/jars/spatial-sdk-json.jar")
hc.sql("add jar /home/jupyter/notebooks/jars/esri-geometry-api.jar")
hc.sql("add jar /home/jupyter/notebooks/jars/spatial-sdk-hive.jar")
hc.sql("create temporary function ST_Contains as 'com.esri.hadoop.hive.ST_Contains'")
hc.sql("create temporary function ST_Point as 'com.esri.hadoop.hive.ST_Point'")
cf = hc.sql("""
SELECT date_str, time, longitude, latitude, resolution, category, district, dayofweek, description
FROM crimes
WHERE longitude < -121.0 and latitude < 38.0
""").repartition(50)
cf.registerTempTable("cf")
crimes2 = hc.sql("""
SELECT date_str, time, dayofweek, category, district, description, longitude, latitude,
if (resolution == 'NONE',0.0,1.0) as resolved,
neighborho as neighborhood
FROM sf_neighborhoods JOIN cf
WHERE ST_Contains(sf_neighborhoods.shape, ST_Point(cf.longitude, cf.latitude))
""").cache()
crimes2.registerTempTable("crimes2")
In [9]:
crimes2.limit(5).toPandas()
Out[9]:
Question: what is the percentage of crimes resolved in each neighborhood?
In [10]:
ngrp = crimes2.groupBy('neighborhood')
ngrp_pd = ngrp.avg('resolved').toPandas()
ngrp_pd['count'] = ngrp.count().toPandas()['count']
ngrp_pd.columns = ['neighborhood', '% resolved', 'count']
data = ngrp_pd.sort(columns = '% resolved', ascending=False)
print data.set_index('neighborhood').head(10)
And as a bar chart:
In [11]:
import matplotlib
matplotlib.style.use('ggplot')
data.iloc[:10].plot('neighborhood', '% resolved', kind='bar', legend=False, figsize=(12,5), fontsize=15)
Out[11]:
Using the Python Folium package I draw an interactive map of San Francisco, and color-code each neighborhood with the percent of resolved crimes:
In [12]:
from IPython.display import HTML
import folium
map_width=1000
map_height=600
sf_lat = 37.77
sf_long = -122.4
def inline_map(m, width=map_width, height=map_height):
m.create_map()
srcdoc = m.HTML.replace('"', '"')
embed = HTML('<iframe srcdoc="{}" '
'style="width: {}px; height: {}px; '
'border: none"></iframe>'.format(srcdoc, width, height))
return embed
map_sf = folium.Map(location=[sf_lat, sf_long], zoom_start=12, width=map_width, height=map_height)
map_sf.geo_json(geo_path='data/sfn.geojson', data=ngrp_pd,
columns=['neighborhood', '% resolved'],
key_on='feature.properties.neighborho',
threshold_scale=[0, 0.3, 0.4, 0.5, 1.0],
fill_color='OrRd', fill_opacity=0.6, line_opacity=0.6,
legend_name='P(resolved)')
inline_map(map_sf)
Out[12]:
First, some basic feature engineering work:
In [13]:
import pandas as pd
crimes3 = hc.sql("""
SELECT cast(SUBSTR(date_str,7,4) as int) as year,
cast(SUBSTR(date_str,1,2) as int) as month,
cast(SUBSTR(time,1,2) as int) as hour,
category, district, dayofweek, description, neighborhood, longitude, latitude, resolved
FROM crimes2
""").cache()
crimes3.limit(5).toPandas()
Out[13]:
For this demo, I create a training set for my model from the data in years 2011-2013 and a testing/validation set from the data in year 2014.
In [14]:
trainData = crimes3.filter(crimes3.year>=2011).filter(crimes3.year<=2013).cache()
testData = crimes3.filter(crimes3.year==2014).cache()
print "training set has " + str(trainData.count()) + " instances"
print "test set has " + str(testData.count()) + " instances"
For convenience, I define a function to compute our classification metrics (we will use it later): precision, recall and accuracy
In [15]:
def eval_metrics(lap):
tp = float(len(lap[(lap['label']==1) & (lap['prediction']==1)]))
tn = float(len(lap[(lap['label']==0) & (lap['prediction']==0)]))
fp = float(len(lap[(lap['label']==0) & (lap['prediction']==1)]))
fn = float(len(lap[(lap['label']==1) & (lap['prediction']==0)]))
precision = tp / (tp+fp)
recall = tp / (tp+fn)
accuracy = (tp+tn) / (tp+tn+fp+fn)
return {'precision': precision, 'recall': recall, 'accuracy': accuracy}
Next, I use Spark-ML to create a pipeline of transformation to generate the feature vector for each crime event:
In [16]:
from IPython.display import Image
Image(filename='pipeline.png')
Out[16]:
In [17]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, Tokenizer, HashingTF
from pyspark.ml import Pipeline
inx1 = StringIndexer(inputCol="category", outputCol="cat-inx")
inx2 = StringIndexer(inputCol="dayofweek", outputCol="dow-inx")
inx3 = StringIndexer(inputCol="district", outputCol="dis-inx")
inx4 = StringIndexer(inputCol="neighborhood", outputCol="ngh-inx")
inx5 = StringIndexer(inputCol="resolved", outputCol="label")
parser = Tokenizer(inputCol="description", outputCol="words")
hashingTF = HashingTF(numFeatures=50, inputCol="words", outputCol="hash-inx")
vecAssembler = VectorAssembler(inputCols =["month", "hour", "cat-inx", "dow-inx", "dis-inx", "ngh-inx", "hash-inx"],
outputCol="features")
Finish up the end-to-end pipeline and run the training set through it. The resulting model is in the model_lr variable, and is used to predict on the testData.
I use the previously defined eval_metrics() function to compute precision, recall, and accuracy
In [18]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=20, regParam=0.1, labelCol="label")
pipeline_lr = Pipeline(stages=[inx1, inx2, inx3, inx4, inx5, parser, hashingTF, vecAssembler, lr])
model_lr = pipeline_lr.fit(trainData)
results_lr = model_lr.transform(testData)
m = eval_metrics(results_lr.select("label", "prediction").toPandas())
print "precision = " + str(m['precision']) + ", recall = " + str(m['recall']) + ", accuracy = " + str(m['accuracy'])
Similarly, create the same pipeline with the Random Forest classifier:
In [19]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(numTrees=250, maxDepth=5, maxBins=50, seed=42)
pipeline_rf = Pipeline(stages=[inx1, inx2, inx3, inx4, inx5, parser, hashingTF, vecAssembler, rf])
model_rf = pipeline_rf.fit(trainData)
results_rf = model_rf.transform(testData)
m = eval_metrics(results_rf.select("label", "prediction").toPandas())
print "precision = " + str(m['precision']) + ", recall = " + str(m['recall']) + ", accuracy = " + str(m['accuracy'])
In [20]:
hc.sql("add jar /home/jupyter/notebooks/jars/guava-11.0.2.jar")
hc.sql("add jar /home/jupyter/notebooks/jars/esri-geometry-api.jar")
hc.sql("add jar /home/jupyter/notebooks/jars/spatial-sdk-hive.jar")
hc.sql("add jar /home/jupyter/notebooks/jars/spatial-sdk-json.jar")
hc.sql("create temporary function ST_Centroid as 'com.esri.hadoop.hive.ST_Centroid'")
hc.sql("create temporary function ST_X as 'com.esri.hadoop.hive.ST_X'")
hc.sql("create temporary function ST_Y as 'com.esri.hadoop.hive.ST_Y'")
df_centroid = hc.sql("""
SELECT neighborho as neighborhood,
ST_X(ST_Centroid(sf_neighborhoods.shape)) as cent_longitude,
ST_Y(ST_Centroid(sf_neighborhoods.shape)) as cent_latitude
FROM sf_neighborhoods
""")
df_centroid.cache()
Out[20]:
Now I draw a map, this time showing a marker with the accuracy for each neighborhood, using the results from the Random Forest model.
In [21]:
df = results_rf.select("neighborhood", "label", "prediction").toPandas()
map_sf = folium.Map(location=[sf_lat, sf_long], zoom_start=12, width=map_width, height=map_height)
n_list = results_rf.select("neighborhood").distinct().toPandas()['neighborhood'].tolist() # list of neighborhoods
for n in df_centroid.collect():
if n.neighborhood in n_list:
m = eval_metrics(df[df['neighborhood']==n.neighborhood])
map_sf.simple_marker([n.cent_latitude, n.cent_longitude], \
popup = n.neighborhood + ": accuracy = %.2f" % m['accuracy'])
inline_map(map_sf)
Out[21]: