In [ ]:
import org.locationtech.geomesa.accumulo.data.AccumuloDataStoreParams._
import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloDataStoreFactory}
import org.geotools.data.{DataStore, DataStoreFinder}
import org.apache.spark.{SparkConf, SparkContext}
import org.locationtech.geomesa.compute.spark.GeoMesaSpark
import scala.collection.JavaConversions._
import org.apache.spark.rdd.RDD
import org.opengis.feature.simple.SimpleFeature
import org.apache.hadoop.conf.Configuration
import org.geotools.data._
import org.opengis.filter._
import org.apache.spark.serializer.KryoRegistrator
import org.locationtech.geomesa.features.kryo.serialization.SimpleFeatureSerializer
import org.geotools.feature.collection.AbstractFeatureVisitor
import com.vividsolutions.jts.geom.{Geometry, Point}
import org.locationtech.geomesa.utils.geotools.{SchemaBuilder, SimpleFeatureTypes}
import org.locationtech.geomesa.features.ScalaSimpleFeatureFactory
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.compute.spark.GeoMesaSparkKryoRegistrator
import org.geotools.feature.simple.SimpleFeatureBuilder
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.geotools.filter.text.ecql.ECQL
In [ ]:
val gdeltDsParams = Map(
zookeepersParam.key -> "zoo1,zoo2,zoo3",
instanceIdParam.key -> "mycloud",
userParam.key -> sc.getConf.get("spark.credentials.ds.username"),
passwordParam.key -> sc.getConf.get("spark.credentials.ds.password"),
tableNameParam.key -> "gdelt")
val gdeltDs = DataStoreFinder.getDataStore(gdeltDsParams).asInstanceOf[AccumuloDataStore]
val countriesDsParams = Map(
zookeepersParam.key -> "zoo1,zoo2,zoo3",
instanceIdParam.key -> "mycloud",
userParam.key -> sc.getConf.get("spark.credentials.ds.username"),
passwordParam.key -> sc.getConf.get("spark.credentials.ds.password"),
tableNameParam.key -> "countries")
val countriesDs = DataStoreFinder.getDataStore(countriesDsParams).asInstanceOf[AccumuloDataStore]
In [ ]:
GeoMesaSpark.register(countriesDs)
GeoMesaSpark.register(gdeltDs)
In [ ]:
val sfts = sc.broadcast(GeoMesaSparkKryoRegistrator.typeCache.values.map{ sft =>
(sft.getTypeName, SimpleFeatureTypes.encodeType(sft))
}.toArray)
In [ ]:
val gdeltRdd: RDD[SimpleFeature] = GeoMesaSpark.rdd(new Configuration(), sc, gdeltDsParams, new Query("gdelt"))
val countriesRdd: RDD[SimpleFeature] = GeoMesaSpark.rdd(new Configuration(), sc, countriesDsParams, new Query("countries"))
In [ ]:
gdeltRdd.foreachPartition{ iter =>
sfts.value.foreach{ case (name, spec) =>
val sft = SimpleFeatureTypes.createType(name, spec)
GeoMesaSparkKryoRegistrator.putType(sft)
}
}
In [ ]:
val keyed = countriesRdd.keyBy{sf => sf.getAttribute("FIPS")}
val reduced = keyed.reduceByKey( (featureA, featureB) => featureA)
val broadcastedCountries = sc.broadcast(reduced.values.collect)
println(broadcastedCountries.value.size)
Map GDELT data to containing country
Based on the geometry of the covering set, key by the country that contains the event
In [ ]:
val keyedData = gdeltRdd.mapPartitions { iter =>
import org.locationtech.geomesa.utils.geotools.Conversions._
iter.flatMap { sf =>
// Iterate over countries until a match is found
val it = broadcastedCountries.value.iterator
var container: Option[String] = None
while (it.hasNext) {
val cover = it.next()
// If the cover's polygon contains the feature,
// or in the case of non-point geoms, if they intersect, set the container
if (cover.geometry.intersects(sf.geometry)) {
container = Some(cover.getAttribute(key).asInstanceOf[String])
}
}
// return the found cover as the key
if (container.isDefined) {
Some(container.get, sf)
} else {
None
}
}
}
In [ ]:
val countableTypes = Seq("Integer", "Long", "Double")
val typeNames = gdeltRdd.first.getType.getTypes.toIndexedSeq.map{ft => ft.getBinding.getSimpleName.toString}
val countableIndices = typeNames.indices.flatMap { index =>
val featureType = typeNames(index)
// Only grab countable types, skipping the ID field
if ((countableTypes contains featureType) && index != 0) {
Some(index, featureType)
} else {
None
}
}.toArray
countableIndices.foreach{println}
val countable = sc.broadcast(countableIndices)
In [ ]:
val schemaBuilder = SchemaBuilder.builder()
schemaBuilder.addString("country")
schemaBuilder.addMultiPolygon("geom")
schemaBuilder.addInt("count")
val featureProperties = gdeltRdd.first.getProperties.toSeq
countableIndices.foreach { case (index, clazz) =>
val featureName = featureProperties.apply(index).getName
clazz match {
case "Integer" => schemaBuilder.addInt(s"total_$featureName")
case "Long" => schemaBuilder.addLong(s"total_$featureName")
case "Double" => schemaBuilder.addDouble(s"total_$featureName")
}
schemaBuilder.addDouble(s"avg_${featureProperties.apply(index).getName}")
}
val countryInfoSft = schemaBuilder.build("countryInformation")
In [ ]:
GeoMesaSpark.register(Seq(countryInfoSft))
In [ ]:
val newSfts = sc.broadcast(GeoMesaSparkKryoRegistrator.typeCache.values.map{ sft =>
(sft.getTypeName, SimpleFeatureTypes.encodeType(sft))
}.toArray)
In [ ]:
keyedData.foreachPartition{ iter =>
newSfts.value.foreach{ case (name, spec) =>
val newSft = SimpleFeatureTypes.createType(name, spec)
GeoMesaSparkKryoRegistrator.putType(newSft)
}
}
Reduce features by their country, computing sums and total counts
In [ ]:
val aggregate = keyedData.reduceByKey( (featureA, featureB) => {
import org.locationtech.geomesa.utils.geotools.Conversions.RichSimpleFeature
val aggregateSft = GeoMesaSparkKryoRegistrator.getType("countryInformation")
countable.value
val typeA = featureA.getType.getTypeName
val typeB = featureB.getType.getTypeName
val result =
// Case: combining two aggregate features
if (typeA == "countryInformation" && typeB == "countryInformation") {
// Combine the "total" properties
(featureA.getProperties, featureB.getProperties).zipped.foreach((propA, propB) => {
val name = propA.getName.toString
if (propA.getName.toString.startsWith("total_") || propA.getName.toString == "count") {
val sum = (propA.getValue, propB.getValue) match {
case (a: Integer, b: Integer) => a + b
case (a: java.lang.Long, b: java.lang.Long) => a + b
case (a: java.lang.Double, b: java.lang.Double) => a + b
case _ => throw new Exception("Couldn't match countable type.")
}
featureA.setAttribute(propA.getName, sum)
}
})
featureA
// Case: combining two gdelt features
} else if (typeA != "countryInformation" && typeB != "countryInformation") {
// Grab each feature's properties
val featurePropertiesA = featureA.getProperties.toSeq
val featurePropertiesB = featureB.getProperties.toSeq
// Create a new aggregate feature to hold the result
val featureFields = Seq("empty", featureA.geometry) ++ Seq.fill(aggregateSft.getTypes.size-2)("0")
val aggregateFeature = ScalaSimpleFeatureFactory.buildFeature(aggregateSft, featureFields, featureA.getID)
// Loop over the countable properties and sum them for both gdelt simple features
countable.value.foreach { case (index, clazz) =>
val propA = featurePropertiesA(index)
val propB = featurePropertiesB(index)
val valA = if (propA == null) 0 else propA.getValue
val valB = if (propB == null) 0 else propB.getValue
// Set the total
if( propA != null && propB != null) {
val sum = (valA, valB) match {
case (a: Integer, b: Integer) => a + b
case (a: java.lang.Long, b: java.lang.Long) => a + b
case (a: java.lang.Double, b: java.lang.Double) => a + b
case x => throw new Exception(s"Couldn't match countable type. $x")
}
aggregateFeature.setAttribute(s"total_${propA.getName.toString}", sum)
} else {
val sum = if (valA != null) valA else if (valB != null) valB else 0
aggregateFeature.setAttribute(s"total_${propB.getName.toString}", sum)
}
}
aggregateFeature.setAttribute(countIndex.value, new Integer(2))
aggregateFeature
// Case: combining a mix
} else {
// Figure out which feature is which
val (aggFeature: SimpleFeature, geoFeature: SimpleFeature) =
if (typeA == "countryInformation" && typeB != "countryInformation") {
(featureA, featureB)
} else {
(featureB, featureA)
}
// Loop over the aggregate feature's properties, adding on the regular feature's properties
aggFeature.getProperties.foreach{prop =>
val name = prop.getName.toString
if (name.startsWith("total_")) {
val geoProp = geoFeature.getProperty(name.substring(6))
if (geoProp != null) {
val sum = (prop.getValue, geoProp.getValue) match {
case (a: Integer, b: Integer) => a + b
case (a: java.lang.Long, b: java.lang.Long) => a + b
case (a: java.lang.Double, b: java.lang.Double) => a + b
case _ => 0
}
aggFeature.setAttribute(name, sum)
}
}
}
aggFeature.setAttribute(countIndex.value, aggFeature.get[Integer]("count") + 1)
aggFeature
}
result
})
In [ ]:
val countryMap: scala.collection.Map[String, Geometry] =
reduced.map{ case (key, sf) =>
(sf.getAttribute("NAME").asInstanceOf[String] -> sf.getAttribute("the_geom").asInstanceOf[Geometry])
}.collectAsMap
val broadcastedCountryMap = sc.broadcast(countryMap)
In [ ]:
val averaged = aggregate.mapPartitions{iter =>
import org.locationtech.geomesa.utils.geotools.Conversions.RichSimpleFeature
iter.flatMap{ case (countryName, sf) =>
if (sf.getType.getTypeName == "countryInformation") {
sf.getProperties.foreach{ prop =>
val name = prop.getName.toString
if (name.startsWith("total_")) {
val count = sf.get[Integer]("count")
val avg = (prop.getValue) match {
case (a: Integer) => a.toDouble / count
case (a: java.lang.Long) => a.toDouble / count
case (a: java.lang.Double) => a / count
case _ => throw new Exception(s"couldn't match $name")
}
sf.setAttribute("avg_" + name.substring(6), avg)
}
}
sf.setAttribute("country", countryName)
sf.setDefaultGeometry(broadcastedCountryMap.value.get(countryName).get)
Some(sf)
} else {
None
}
}
}
In [ ]:
%AddDeps org.geotools gt-geojson 14.1 --transitive --repository http://download.osgeo.org/webdav/geotools
In [ ]:
import org.geotools.geojson.feature.FeatureJSON
import java.io.StringWriter
// Convert simple features to their GeoJson string representation
val geoJsonWriters = averaged.mapPartitions{ iter =>
val featureJson = new FeatureJSON()
val strRep = iter.map{ sf =>
featureJson.toString(sf)
}
// Join all the features on this partition
Iterator(strRep.mkString(","))
}
// Collect these strings and joing them into a json array
val geoJsonString = geoJsonWriters.collect.mkString("[",",","]")
In [ ]:
import java.io.File
import java.io.FileWriter
val jsonFile = new File("aggregateGdeltEarthJuly.json")
val fw = new FileWriter(jsonFile)
fw.write(geoJsonString)
fw.close
In [ ]:
%%HTML
<link rel="stylesheet" href="http://cdn.leafletjs.com/leaflet/v0.7.7/leaflet.css" />
<script src="http://cdn.leafletjs.com/leaflet/v0.7.7/leaflet.js"></script>
<style>
.info { padding: 6px 8px; font: 14px/18px Arial, Helvetica, sans-serif; background: white; background: rgba(255,255,255,0.8); box-shadow: 0 0 15px rgba(0,0,0,0.2); border-radius: 5px; }
.info b { margin: 0 0 5px; color: #777; }
.legend {
line-height: 18px;
color: #555;
}
.legend i {
width: 18px;
height: 18px;
float: left;
opacity: 0.7;
}</style>
In [ ]:
%%javascript
(new MutationObserver(function() {
// START - leaflet
// Add the base map and center around US
var map = L.map('map').setView([35.4746,-44.7022],3);
L.tileLayer("http://{s}.tile.osm.org/{z}/{x}/{y}.png").addTo(map);
// Function to set popups for each feature
function onEachFeature(feature, layer) {
layer.bindPopup(feature.properties.popupContent);
}
// Colors for population levels
var colorRange = ["#d73027","#f46d43","#fdae61","#fee08b","#ffffbf","#d9ef8b","#a6d96a","#66bd63","#1a9850"];
var grades = [-3, -2.25, -1.5, -0.75, 0, 0.75, 1.5, 2.25, 3];
// Function to set popup content and fill color
function decorate(feature) {
// Set the popup content to be the country's properties
var popup = "";
for (var prop in feature.properties) {
popup += (prop + ": " + feature.properties[prop] + "<br/>")
}
feature.properties.popupContent = popup;
// Set fill color based on goldstein scale
var fillColor = colorRange[8];
for (var x = 0; x < 9; x++) {
if (feature.properties.avg_goldsteinScale < grades[x]) {
fillColor = colorRange[x]
break
}
}
feature.properties.style = {
color: "black",
opacity: ".6",
fillColor: fillColor,
weight: ".5",
fillOpacity: ".6"
}
}
// Create the map legend
var legend = L.control({position: "bottomright"});
legend.onAdd = function (map) {
var div = L.DomUtil.create("div", "info legend");
div.innerHTML+="<span>Avg. Goldstein Scale</span><br/>";
// create a color tile for each interval
for (var i = 0; i < grades.length; i++) {
div.innerHTML +=
"<i style='background:" + colorRange[i] + "'></i> ";
}
div.innerHTML += "<br/>";
// label bounds of intervals
div.innerHTML += "<i>"+grades[0]+"</i>";
for (var i = 1; i < grades.length-1; i++) {
div.innerHTML +="<i></i>"
}
div.innerHTML += "<i>"+grades[8]+"</i>";
return div;
};
legend.addTo(map);
var info = L.control();
info.onAdd = function (map) {
this._div = L.DomUtil.create("div", "info");
this.update();
return this._div;
};
info.update = function (props) {
this._div.innerHTML = "<b>GDELT Data by Country</b>"
};
info.addTo(map);
// Open the geojson file and add it as a layer
var rawFile = new XMLHttpRequest();
rawFile.onreadystatechange = function () {
if(rawFile.readyState === 4) {
if(rawFile.status === 200 || rawFile.status == 0) {
var allText = rawFile.response;
var gdeltJson = JSON.parse(allText)
console.log(gdeltJson)
gdeltJson.forEach(decorate)
L.geoJson(gdeltJson, {
style: function(feature) { return feature.properties.style},
onEachFeature: onEachFeature
}).addTo(map);
// Css override
$('svg').css("max-width","none")
}
}
}
rawFile.open("GET", "aggregateGdeltEarthJuly.json", false);
rawFile.send()
//END - leaflet
this.disconnect()
})).observe(element[0], {childList: true})
element.append($('<div/>', { id: "map", width: "100%", height: "512px" }))