The City of San Francisco publishes historical crime events on their http://sfdata.gov website.
I have loaded this dataset into HIVE. Let's use Spark to do some fun stuff with it!
First we setup SparkR, create a SparkContext as well as a HiveContext to access data from Hive:
In [1]:
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))
library(SparkR)
require(ggplot2)
In [2]:
# Create Spark Context
sc = sparkR.init(master="yarn-client", appName="SparkR Demo",
sparkEnvir = list(spark.executor.memory="4g", spark.executor.instances="15"))
# Create HiveContext
hc = sparkRHive.init(sc)
sql(hc, "use demo")
Out[2]:
Let's take a look at the dataset - first 5 rows:
In [3]:
crimes = table(hc, "crimes")
df = head(crimes, 5)
df
Out[3]:
What are the different types of crime resolutions?
In [4]:
showDF(distinct(select(crimes, "resolution")))
Let's define a crime as 'resolved' if it has any string except "NONE" in the resolution column.
Question: How many crimes total in the dataset? How many resolved?
In [5]:
total = count(crimes)
num_resolved = count(filter(crimes, crimes$resolution != 'NONE'))
print(paste0(total, " crimes total, out of which ", num_resolved, " were resolved"))
Let's look at the longitude/latitude values in more detail. Spark provides the describe() function to see this some basic statistics of these columns:
In [6]:
c1 = select(crimes, alias(cast(crimes$longitude, "float"), "long"),
alias(cast(crimes$latitude, "float"), "lat"))
showDF(describe(c1))
Notice that the max values for longitude (-120.5) and latitude (90.0) seem strange. Those are not inside the SF area. Let's see how many bad values like this exist in the dataset:
In [7]:
c2 = filter(c1, "lat < 37 or lat > 38")
print(count(c2))
showDF(limit(c2,3))
Seems like this is a data quality issue where some data points just have a fixed (bad) value of -120.5, 90.
In [8]:
sql(hc, "add jar /home/jupyter/notebooks/jars/guava-11.0.2.jar")
sql(hc, "add jar /home/jupyter/notebooks/jars/spatial-sdk-json.jar")
sql(hc, "add jar /home/jupyter/notebooks/jars/esri-geometry-api.jar")
sql(hc, "add jar /home/jupyter/notebooks/jars/spatial-sdk-hive.jar")
sql(hc, "create temporary function ST_Contains as 'com.esri.hadoop.hive.ST_Contains'")
sql(hc, "create temporary function ST_Point as 'com.esri.hadoop.hive.ST_Point'")
cf1 = sql(hc,
"SELECT date_str, time, longitude, latitude, resolution, category, district, dayofweek, description
FROM crimes
WHERE longitude < -121.0 and latitude < 38.0")
cf2 = repartition(cf1, 50)
registerTempTable(cf2, "cf")
crimes2 = sql(hc,
"SELECT date_str, time, dayofweek, category, district, description, longitude, latitude,
if (resolution == 'NONE',0.0,1.0) as resolved,
neighborho as neighborhood
FROM sf_neighborhoods JOIN cf
WHERE ST_Contains(sf_neighborhoods.shape, ST_Point(cf.longitude, cf.latitude))")
# cache(crimes2)
registerTempTable(crimes2, "crimes2")
Out[8]:
Out[8]:
Out[8]:
Out[8]:
Out[8]:
Out[8]:
In [9]:
showDF(limit(crimes2, 5))
Question: what is the percentage of crimes resolved in each neighborhood?
In [10]:
ngrp = groupBy(crimes2, "neighborhood")
df = summarize(ngrp, pres = avg(crimes2$resolved), count = n(crimes2$resolved))
df_sorted = arrange(df, desc(df$pres))
head(df_sorted, 10)
Out[10]:
In [11]:
ggplot(data=take(df_sorted, 10), aes(x=neighborhood, y=pres)) +
geom_bar(colour="black", stat="identity", fill="#DD8888") + guides(fill=FALSE) +
theme(axis.text.x = element_text(angle = 90, hjust = 1))
Out[11]:
In [12]:
dfl = collect(df)
Using the R leaflet package I draw an interactive map of San Francisco, and color-code each neighborhood with the percent of resolved crimes:
In [13]:
require(leaflet)
require(htmlwidgets)
library(IRdisplay)
library(jsonlite)
sf_lat = 37.77
sf_long = -122.4
geojson <- readLines("data/sfn.geojson") %>% paste(collapse = "\n") %>% fromJSON(simplifyVector = FALSE)
pal <- colorQuantile("OrRd", dfl$pres, n=7)
geojson$features <- lapply(geojson$features, function(feat) {
feat$properties$style <- list(fillColor = pal(dfl[dfl$neighborhood==feat$properties$neighborho, "pres"]))
feat
})
m <- leaflet() %>%
setView(lng = sf_long, lat = sf_lat, zoom = 12) %>%
addTiles() %>%
addGeoJSON(geojson, weight = 1.5, color = "#444444", opacity = 1, fillOpacity = 0.3) %>%
addLegend("topright", pal = pal, values = dfl$pres, title = "P(resolved)", opacity = 1)
tf = 'map1.html'
saveWidget(m, file = tf, selfcontained = T)
display_html(paste0("<iframe src='", tf, "' width=950 height=600></iframe>"))
In [14]:
# initial data frame
crimes = sql(hc, "SELECT cast(SUBSTR(date_str,7,4) as int) as year,
cast(SUBSTR(date_str,1,2) as int) as month,
cast(SUBSTR(time,1,2) as int) as hour,
resolved, category, district, dayofweek, description, neighborhood
FROM crimes2
WHERE latitude > 37 and latitude < 38")
trainData = filter(crimes, "year>=2011 and year<=2013")
cache(trainData)
testData = filter(crimes, "year=2014")
cache(testData)
print(paste0("training set has ", count(trainData), " instances"))
print(paste0("test set has ", count(testData), " instances"))
Out[14]:
Out[14]:
In [15]:
model <- glm(resolved ~ month + hour + category + district + dayofweek + neighborhood,
family = "binomial", data = trainData)
summary(model)
Out[15]:
In [16]:
# Predict results for test data
pmat <- predict(model, testData)
In [62]:
# Compute precision/recall curve
registerTempTable(pmat, "pmat")
cm = sql(hc, "SELECT prediction, label, COUNT(*) as cnt from pmat GROUP BY prediction, label")
cml = collect(cm)
tp = cml[cml['prediction']==1.0 & cml['label']==1.0, "cnt"]
tn = cml[cml['prediction']==0.0 & cml['label']==0.0, "cnt"]
fp = cml[cml['prediction']==1.0 & cml['label']==0.0, "cnt"]
fn = cml[cml['prediction']==0.0 & cml['label']==1.0, "cnt"]
precision = tp / (tp+fp)
recall = tp / (tp+fn)
accuracy = (tp+tn) / (tp+tn+fp+fn)
print(sprintf("precision = %0.2f, recall = %0.2f, accuracy = %0.2f", precision, recall, accuracy))
In [ ]: