Shallow Join of GDELT Data

  • This notebook demonstrates aggregation of data by a covering set
  • Here, we use GDELT as the data to be aggregated and a set of countries as the covering set
  • Countable fields of the GDELT data are automatically collected, totaled and averaged
  • The end result is a new Simple Feature Type with aggregate features
  • This result is then visualized in an interactive leaflet map

Initialization:

Import needed modules


In [ ]:
import org.locationtech.geomesa.accumulo.data.AccumuloDataStoreParams._
import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloDataStoreFactory}
import org.geotools.data.{DataStore, DataStoreFinder}
import org.apache.spark.{SparkConf, SparkContext}
import org.locationtech.geomesa.compute.spark.GeoMesaSpark
import scala.collection.JavaConversions._
import org.apache.spark.rdd.RDD
import org.opengis.feature.simple.SimpleFeature
import org.apache.hadoop.conf.Configuration
import org.geotools.data._
import org.opengis.filter._
import org.apache.spark.serializer.KryoRegistrator
import org.locationtech.geomesa.features.kryo.serialization.SimpleFeatureSerializer
import org.geotools.feature.collection.AbstractFeatureVisitor
import com.vividsolutions.jts.geom.{Geometry, Point}
import org.locationtech.geomesa.utils.geotools.{SchemaBuilder, SimpleFeatureTypes}
import org.locationtech.geomesa.features.ScalaSimpleFeatureFactory
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.compute.spark.GeoMesaSparkKryoRegistrator
import org.geotools.feature.simple.SimpleFeatureBuilder
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.geotools.filter.text.ecql.ECQL

Initialize the data stores


In [ ]:
val gdeltDsParams = Map(
         zookeepersParam.key -> "zoo1,zoo2,zoo3",
         instanceIdParam.key -> "mycloud",
         userParam.key -> sc.getConf.get("spark.credentials.ds.username"),
         passwordParam.key -> sc.getConf.get("spark.credentials.ds.password"),
         tableNameParam.key -> "gdelt")

val gdeltDs = DataStoreFinder.getDataStore(gdeltDsParams).asInstanceOf[AccumuloDataStore]

val countriesDsParams = Map(
        zookeepersParam.key -> "zoo1,zoo2,zoo3",
        instanceIdParam.key -> "mycloud",
        userParam.key -> sc.getConf.get("spark.credentials.ds.username"),
        passwordParam.key -> sc.getConf.get("spark.credentials.ds.password"),
        tableNameParam.key -> "countries")
       
val countriesDs = DataStoreFinder.getDataStore(countriesDsParams).asInstanceOf[AccumuloDataStore]

Register the sfts into the Kryo Registrator


In [ ]:
GeoMesaSpark.register(countriesDs)
GeoMesaSpark.register(gdeltDs)

Broadcast the registered sfts to executors


In [ ]:
val sfts = sc.broadcast(GeoMesaSparkKryoRegistrator.typeCache.values.map{ sft => 
    (sft.getTypeName, SimpleFeatureTypes.encodeType(sft))
}.toArray)

Get the target data into RDDs


In [ ]:
val gdeltRdd: RDD[SimpleFeature] = GeoMesaSpark.rdd(new Configuration(), sc, gdeltDsParams, new Query("gdelt"))
val countriesRdd: RDD[SimpleFeature] = GeoMesaSpark.rdd(new Configuration(), sc, countriesDsParams, new Query("countries"))

Put the broadcasted sfts into each partition's type cache


In [ ]:
gdeltRdd.foreachPartition{ iter =>              
    sfts.value.foreach{ case (name, spec) => 
        val sft = SimpleFeatureTypes.createType(name, spec)
        GeoMesaSparkKryoRegistrator.putType(sft)
    }                       
}

Combine duplicate country entries.


In [ ]:
val keyed = countriesRdd.keyBy{sf => sf.getAttribute("FIPS")}
val reduced = keyed.reduceByKey( (featureA, featureB) => featureA)
val broadcastedCountries = sc.broadcast(reduced.values.collect)
println(broadcastedCountries.value.size)

Map:

Map GDELT data to containing country
Based on the geometry of the covering set, key by the country that contains the event


In [ ]:
val keyedData = gdeltRdd.mapPartitions { iter =>   
    import org.locationtech.geomesa.utils.geotools.Conversions._

    iter.flatMap { sf =>        
        // Iterate over countries until a match is found
        val it = broadcastedCountries.value.iterator
        var container: Option[String] = None

        while (it.hasNext) {
          val cover = it.next()
          // If the cover's polygon contains the feature,
          // or in the case of non-point geoms, if they intersect, set the container
          if (cover.geometry.intersects(sf.geometry)) {
            container = Some(cover.getAttribute(key).asInstanceOf[String])
          }
        }
        // return the found cover as the key
        if (container.isDefined) {
          Some(container.get, sf)
        } else {
          None
        }
    }                                            
}

Get the indices and types of the attributes that can be aggregated and send them to the partitions


In [ ]:
val countableTypes = Seq("Integer", "Long", "Double")
val typeNames = gdeltRdd.first.getType.getTypes.toIndexedSeq.map{ft => ft.getBinding.getSimpleName.toString}
val countableIndices = typeNames.indices.flatMap { index => 
    val featureType = typeNames(index)
    // Only grab countable types, skipping the ID field
    if ((countableTypes contains featureType) && index != 0) {
        Some(index, featureType)
    } else {
        None
    }
}.toArray
countableIndices.foreach{println}
val countable = sc.broadcast(countableIndices)

Create a Simple Feature Type based on what can be aggregated

Todo: store into sealed trait


In [ ]:
val schemaBuilder = SchemaBuilder.builder()
schemaBuilder.addString("country")
schemaBuilder.addMultiPolygon("geom")
schemaBuilder.addInt("count")
val featureProperties = gdeltRdd.first.getProperties.toSeq
countableIndices.foreach { case (index, clazz) =>
    val featureName = featureProperties.apply(index).getName
    clazz match {
      case "Integer" => schemaBuilder.addInt(s"total_$featureName")
      case "Long" => schemaBuilder.addLong(s"total_$featureName")
      case "Double" => schemaBuilder.addDouble(s"total_$featureName")
    }
    schemaBuilder.addDouble(s"avg_${featureProperties.apply(index).getName}")
}
val countryInfoSft = schemaBuilder.build("countryInformation")

Register it with kryo and send it to executors


In [ ]:
GeoMesaSpark.register(Seq(countryInfoSft))

In [ ]:
val newSfts = sc.broadcast(GeoMesaSparkKryoRegistrator.typeCache.values.map{ sft => 
    (sft.getTypeName, SimpleFeatureTypes.encodeType(sft))
}.toArray)

In [ ]:
keyedData.foreachPartition{ iter => 
    newSfts.value.foreach{ case (name, spec) => 
        val newSft = SimpleFeatureTypes.createType(name, spec)
        GeoMesaSparkKryoRegistrator.putType(newSft)
    }        
}

Reduce

Reduce features by their country, computing sums and total counts


In [ ]:
val aggregate = keyedData.reduceByKey( (featureA, featureB) => {
    
    import org.locationtech.geomesa.utils.geotools.Conversions.RichSimpleFeature

    val aggregateSft = GeoMesaSparkKryoRegistrator.getType("countryInformation")
     countable.value
    val typeA = featureA.getType.getTypeName
    val typeB = featureB.getType.getTypeName
    val result = 
    // Case: combining two aggregate features
    if (typeA == "countryInformation" && typeB == "countryInformation") {
        // Combine the "total" properties
        (featureA.getProperties, featureB.getProperties).zipped.foreach((propA, propB) => {
            val name = propA.getName.toString
            if (propA.getName.toString.startsWith("total_") || propA.getName.toString == "count") {  
                val sum = (propA.getValue, propB.getValue) match {
                    case (a: Integer, b: Integer) => a + b
                    case (a: java.lang.Long, b: java.lang.Long) => a + b
                    case (a: java.lang.Double, b: java.lang.Double) => a + b
                    case _ => throw new Exception("Couldn't match countable type.")
                }
                featureA.setAttribute(propA.getName, sum)             
            }
        })
        featureA
        
    // Case: combining two gdelt features
    } else if (typeA != "countryInformation" && typeB != "countryInformation") {
        
        // Grab each feature's properties
        val featurePropertiesA = featureA.getProperties.toSeq
        val featurePropertiesB = featureB.getProperties.toSeq
        // Create a new aggregate feature to hold the result
        val featureFields = Seq("empty", featureA.geometry) ++ Seq.fill(aggregateSft.getTypes.size-2)("0")                        
        val aggregateFeature = ScalaSimpleFeatureFactory.buildFeature(aggregateSft, featureFields, featureA.getID)                      
        
        // Loop over the countable properties and sum them for both gdelt simple features
        countable.value.foreach { case (index, clazz) =>
            val propA = featurePropertiesA(index)
            val propB = featurePropertiesB(index)
            val valA = if (propA == null) 0 else propA.getValue
            val valB = if (propB == null) 0 else propB.getValue

            // Set the total
            if( propA != null && propB != null) {
                val sum  = (valA, valB) match {
                    case (a: Integer, b: Integer) => a + b
                    case (a: java.lang.Long, b: java.lang.Long) => a + b
                    case (a: java.lang.Double, b: java.lang.Double) => a + b
                    case x => throw new Exception(s"Couldn't match countable type. $x")
                }
                aggregateFeature.setAttribute(s"total_${propA.getName.toString}", sum)
            } else {
                val sum = if (valA != null) valA else if (valB != null) valB else 0
                aggregateFeature.setAttribute(s"total_${propB.getName.toString}", sum)
            }
        }
        aggregateFeature.setAttribute(countIndex.value, new Integer(2))              
        aggregateFeature
    
    // Case: combining a mix
    } else {
        
        // Figure out which feature is which
        val (aggFeature: SimpleFeature, geoFeature: SimpleFeature) = 
            if (typeA == "countryInformation" && typeB != "countryInformation") {
                (featureA, featureB)
            } else {
                (featureB, featureA)
            }
        
        // Loop over the aggregate feature's properties, adding on the regular feature's properties
        aggFeature.getProperties.foreach{prop =>             
            val name = prop.getName.toString
            if (name.startsWith("total_")) {
                val geoProp = geoFeature.getProperty(name.substring(6))
                if (geoProp != null) {
                    val sum = (prop.getValue, geoProp.getValue) match {
                        case (a: Integer, b: Integer) => a + b
                        case (a: java.lang.Long, b: java.lang.Long) => a + b
                        case (a: java.lang.Double, b: java.lang.Double) => a + b
                        case _ => 0
                    }          
                    aggFeature.setAttribute(name, sum)
                }
            }

        }
        aggFeature.setAttribute(countIndex.value, aggFeature.get[Integer]("count") + 1)
        aggFeature
    } 
    
    result
})

Send a map of country name -> geom to the executors


In [ ]:
val countryMap: scala.collection.Map[String, Geometry] = 
    reduced.map{ case (key, sf) => 
        (sf.getAttribute("NAME").asInstanceOf[String] -> sf.getAttribute("the_geom").asInstanceOf[Geometry])
    }.collectAsMap
    
val broadcastedCountryMap = sc.broadcast(countryMap)

Compute averages and set country names and geoms


In [ ]:
val averaged = aggregate.mapPartitions{iter =>
    import org.locationtech.geomesa.utils.geotools.Conversions.RichSimpleFeature

    iter.flatMap{ case (countryName, sf) =>
        if (sf.getType.getTypeName == "countryInformation") {
            sf.getProperties.foreach{ prop =>
                val name = prop.getName.toString
                if (name.startsWith("total_")) {
                    val count = sf.get[Integer]("count")
                    val avg = (prop.getValue) match {
                            case (a: Integer) => a.toDouble / count
                            case (a: java.lang.Long) => a.toDouble / count
                            case (a: java.lang.Double) => a / count
                            case _ => throw new Exception(s"couldn't match $name")
                    }  

                    sf.setAttribute("avg_" + name.substring(6), avg)
                }
            }
            sf.setAttribute("country", countryName)
            sf.setDefaultGeometry(broadcastedCountryMap.value.get(countryName).get)

            Some(sf)
        } else {
            None
        }
    }
}

Export

Add the GeoTools GeoJSON dependency


In [ ]:
%AddDeps org.geotools gt-geojson 14.1 --transitive --repository http://download.osgeo.org/webdav/geotools

Write Simple Features to GeoJSON


In [ ]:
import org.geotools.geojson.feature.FeatureJSON
import java.io.StringWriter

// Convert simple features to their GeoJson string representation
val geoJsonWriters = averaged.mapPartitions{ iter => 
    val featureJson = new FeatureJSON()        
    
    val strRep = iter.map{ sf =>         
        featureJson.toString(sf)
    }          
    // Join all the features on this partition
    Iterator(strRep.mkString(","))
}

// Collect these strings and joing them into a json array
val geoJsonString = geoJsonWriters.collect.mkString("[",",","]")

Write the string to a file


In [ ]:
import java.io.File
import java.io.FileWriter
val jsonFile = new File("aggregateGdeltEarthJuly.json")
val fw = new FileWriter(jsonFile)
fw.write(geoJsonString)
fw.close

Visualize

Add leaflet styles and javascript


In [ ]:
%%HTML
<link rel="stylesheet" href="http://cdn.leafletjs.com/leaflet/v0.7.7/leaflet.css" />
<script src="http://cdn.leafletjs.com/leaflet/v0.7.7/leaflet.js"></script>
<style>
.info { padding: 6px 8px; font: 14px/18px Arial, Helvetica, sans-serif; background: white; background: rgba(255,255,255,0.8); box-shadow: 0 0 15px rgba(0,0,0,0.2); border-radius: 5px; } 
.info b { margin: 0 0 5px; color: #777; }
.legend {
    line-height: 18px;
    color: #555;
}
.legend i {
    width: 18px;
    height: 18px;
    float: left;    
    opacity: 0.7;
}</style>

Set up the map and populate it with the geojson data


In [ ]:
%%javascript

(new MutationObserver(function() {
    // START - leaflet
    
    // Add the base map and center around US
    var map = L.map('map').setView([35.4746,-44.7022],3);
    L.tileLayer("http://{s}.tile.osm.org/{z}/{x}/{y}.png").addTo(map); 
    
    // Function to set popups for each feature
    function onEachFeature(feature, layer) {
        layer.bindPopup(feature.properties.popupContent);        
    }

    // Colors for population levels
    var colorRange = ["#d73027","#f46d43","#fdae61","#fee08b","#ffffbf","#d9ef8b","#a6d96a","#66bd63","#1a9850"];
    var grades = [-3, -2.25, -1.5, -0.75, 0, 0.75, 1.5, 2.25, 3];
    // Function to set popup content and fill color 
    function decorate(feature) {

        // Set the popup content to be the country's properties
        var popup = "";
        for (var prop in feature.properties) {
            popup += (prop + ": " + feature.properties[prop] + "<br/>")            
        }
        feature.properties.popupContent = popup;    

        // Set fill color based on goldstein scale
        var fillColor = colorRange[8];        
        for (var x = 0; x < 9; x++) {
            if (feature.properties.avg_goldsteinScale < grades[x]) {
                fillColor = colorRange[x]
                break
            }
        }            

        feature.properties.style = {
            color: "black",
            opacity: ".6",
            fillColor: fillColor,
            weight: ".5",
            fillOpacity: ".6"
        }        
    }

    // Create the map legend
    var legend = L.control({position: "bottomright"});

    legend.onAdd = function (map) {

        var div = L.DomUtil.create("div", "info legend");

        div.innerHTML+="<span>Avg. Goldstein Scale</span><br/>";
        // create a color tile for each interval
        for (var i = 0; i < grades.length; i++) {
            div.innerHTML +=
                "<i style='background:" + colorRange[i] + "'></i> ";
        }
        div.innerHTML += "<br/>";
        
        // label bounds of intervals
        div.innerHTML += "<i>"+grades[0]+"</i>";
        for (var i = 1; i < grades.length-1; i++) {
            div.innerHTML +="<i></i>"
        }
        div.innerHTML += "<i>"+grades[8]+"</i>";

        return div;
    };

    legend.addTo(map);


    var info = L.control();

    info.onAdd = function (map) {
        this._div = L.DomUtil.create("div", "info");
        this.update();
        return this._div;
    };

    info.update = function (props) {
        this._div.innerHTML = "<b>GDELT Data by Country</b>"
    };

    info.addTo(map);
    // Open the geojson file and add it as a layer
    var rawFile = new XMLHttpRequest();
        rawFile.onreadystatechange = function () {                
        if(rawFile.readyState === 4) {                                   
            if(rawFile.status === 200 || rawFile.status == 0) {                
                var allText = rawFile.response;
                var gdeltJson = JSON.parse(allText)    
                console.log(gdeltJson)
                gdeltJson.forEach(decorate)
                L.geoJson(gdeltJson, {
                    style: function(feature) { return feature.properties.style},
                    onEachFeature: onEachFeature
                }).addTo(map); 
                // Css override
                $('svg').css("max-width","none")
            }
        }
    }        
    rawFile.open("GET", "aggregateGdeltEarthJuly.json", false);
    rawFile.send()

    //END - leaflet
    this.disconnect()
})).observe(element[0], {childList: true})


element.append($('<div/>', { id: "map", width: "100%", height: "512px" }))

The Goldstein Scale is a metric of how events contribute to the stability of a country. Here we see Ukraine and Central African Republic have low averages of this metric, implying that the events taking place have a negative potential impact on the region.