GeoWave Spatial Join Demo

This demo runs a distance join using an GPX dataset for Germany and the GDELT dataset. We use this demo to run a distance join using our tiered join algorithm on two large datasets to get what GPX points are within a certain distance to GDELT events.

To run this join on Spark using a naive Spark SQL query would take 20+ hours to possibly get a result. With this algorithm and GeoWaves tiered indexing strategy we can complete the same join in 2-5 hours depending on the cluster size and configuration. This algorithm is not the answer to every join situation however, for smaller dataset sizes that can fit into memory you are performing extra work by running this join in its current implementation. For those datasets using native Spark joins are still a better option.

The current implementation of this algorithm considers the worst case scenario for each dataset. This will be improved upon quickly over the next updates and releases. Currently, the algorithm will dynamically index each set even when the underlying indexing method for each rdd is the same. This requires a touch of all records in the dataset which can be avoided for a majority of joins where the indexing methods are the same between both sets.

Simply focus a cell and use [SHIFT + ENTER] to run the code.

Import pixiedust

Start by importing pixiedust which if all bootstrap and install steps were run correctly. You should see below for opening the pixiedust database successfully with no errors. Depending on the version of pixiedust that gets installed, it may ask you to update. If so, run this first cell.


In [ ]:
#!pip install --user --upgrade pixiedust

In [ ]:
#Stop old session
spark.stop()

Picking the right parallelism

It's important to pick a high enough parallelism to partition the data into small enough chunks to support the join. Relying on the default set by Spark for the cluster size when working with a extremely large set of data is recipe for OOM errors on the executor.

If you're having trouble finding the right parallelism try looking at the Spark history server and checking what your largest partition size is. Aim for a max partition size of ~64MB preferably smaller.


In [ ]:
#Create new session with adequate parallelism
spark = SparkSession.builder\
.config('spark.serializer','org.apache.spark.serializer.KryoSerializer')\
.config('spark.kryo.registrator', 'org.locationtech.geowave.analytic.spark.GeoWaveRegistrator')\
.config('spark.default.parallelism', '6000')\
.getOrCreate()

In [ ]:
print(spark.__dict__)

In [ ]:
sc = spark.sparkContext
import pixiedust
import geowave_pyspark

In [ ]:
pixiedust.enableJobMonitor()

In [ ]:
# Print Spark info and create sql_context
print('Spark Version: {0}'.format(sc.version))
print('Python Version: {0}'.format(sc.pythonVer))
print('Application Name: {0}'.format(sc.appName))
print('Application ID: {0}'.format(sc.applicationId))
print('Spark Master: {0}'.format( sc.master))

Download and ingest the GPX data

NOTE Depending on cluster size sometimes the copy can fail. This appears to be a race condition error with the copy command when downloading the files from s3. This may make the following import into acccumulo command fail. You can check the accumulo tables by looking at port 9995 of the emr cluster. There should be 5 tables after importing.


In [ ]:
%%bash
s3-dist-cp -D mapreduce.task.timeout=60000000 --src=s3://geowave-gpx-data/gpx --dest=hdfs://$HOSTNAME:8020/tmp/

In [ ]:
%%bash
/opt/accumulo/bin/accumulo shell -u root -p secret -e "importtable geowave.germany_gpx_SPATIAL_IDX /tmp/spatial"
/opt/accumulo/bin/accumulo shell -u root -p secret -e "importtable geowave.germany_gpx_GEOWAVE_METADATA /tmp/metadata"

In [ ]:
%%bash
# configure geowave connection params for store
geowave config addstore germany_gpx --gwNamespace geowave.germany_gpx -t accumulo -i accumulo -u root -p secret --zookeeper $HOSTNAME:2181

Download GDELT Data

Download the gdelt data necessary to perform the join. You can either download the quickstart events which ends around ~120k features, or you can download all events from 2010 onward which is close to ~500k+ features. If you want the larger dataset run the cell below, but replace "TIME_REGEX" with "LARGER_TIME_REGEX"


In [ ]:
%%bash
cd /mnt/tmp
wget s3.amazonaws.com/geowave/latest/scripts/emr/quickstart/geowave-env.sh
source /mnt/tmp/geowave-env.sh

#setup a larger regex for every event after 2010
export LARGER_TIME_REGEX=201

mkdir gdelt
cd gdelt
wget http://data.gdeltproject.org/events/md5sums
for file in `cat md5sums | cut -d' ' -f3 | grep "^${TIME_REGEX}"` ; \
do wget http://data.gdeltproject.org/events/$file ; done
md5sum -c md5sums 2>&1 | grep "^${TIME_REGEX}"

Ingest GDELT Data

Depending on how many events were downloaded above this step could take anywhere from 10 minutes to hours. The CQL filter only ingests a small portion of the events over Europe.


In [ ]:
%%bash

# We have to source here again because bash runs in a separate sub process each cell.
source /mnt/tmp/geowave-env.sh

# clear old potential runs
geowave config rmstore gdelt
geowave config rmindex gdelt-spatial

# configure geowave connection params for accumulo stores "gdelt"
geowave config addstore gdelt --gwNamespace geowave.gdelt -t accumulo -i accumulo -u root -p secret --zookeeper $HOSTNAME:2181

# configure a spatial index
geowave config addindex -t spatial gdelt-spatial --partitionStrategy round_robin --numPartitions $NUM_PARTITIONS

# run the ingest for a 10x10 deg bounding box over Europe
geowave ingest localtogw /mnt/tmp/gdelt gdelt gdelt-spatial -f gdelt \
--gdelt.cql "BBOX(geometry, 0, 50, 10, 60)"

In [ ]:
#grab classes from jvm
hbase_options_class = sc._jvm.org.locationtech.geowave.datastore.hbase.cli.config.HBaseRequiredOptions
accumulo_options_class = sc._jvm.org.locationtech.geowave.datastore.accumulo.cli.config.AccumuloRequiredOptions

query_options_class = sc._jvm.org.locationtech.geowave.core.store.query.QueryOptions
geowave_rdd_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDD
indexed_rdd_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveIndexedRDD
rdd_loader_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDDLoader
rdd_options_class = sc._jvm.org.locationtech.geowave.analytic.spark.RDDOptions
sf_df_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.SimpleFeatureDataFrame
byte_array_class = sc._jvm.org.locationtech.geowave.core.index.ByteArrayId

#grab classes for spatial join
join_runner_class = sc._jvm.org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner
index_builder_class = sc._jvm.org.locationtech.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider.SpatialIndexBuilder
geom_intersects_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.udf.GeomIntersects
geom_distance_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.udf.GeomWithinDistance

udf_registry_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.udf.GeomFunctionRegistry

feature_data_adapter_class = sc._jvm.org.locationtech.geowave.adapter.vector.FeatureDataAdapter
feature_data_utils = sc._jvm.org.locationtech.geowave.adapter.vector.util.FeatureDataUtils
sft_builder_class = sc._jvm.org.geotools.feature.simple.SimpleFeatureTypeBuilder

datastore_utils_class = sc._jvm.org.locationtech.geowave.core.store.util.DataStoreUtils

udf_registry_class.registerGeometryFunctions(spark._jsparkSession)

spatial_encoders_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.GeoWaveSpatialEncoders

spatial_encoders_class.registerUDTs()

In [ ]:
import os
#setup input datastore
gpx_store = accumulo_options_class()
gpx_store.setInstance('accumulo')
gpx_store.setUser('root')
gpx_store.setPassword('secret')
gpx_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
gpx_store.setGeowaveNamespace('geowave.germany_gpx')

#Setup osm datastore
gdelt_store = accumulo_options_class()
gdelt_store.setInstance('accumulo')
gdelt_store.setUser('root')
gdelt_store.setPassword('secret')
gdelt_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
gdelt_store.setGeowaveNamespace('geowave.gdelt')

#Setup output store
output_store = accumulo_options_class()
output_store.setInstance('accumulo')
output_store.setUser('root')
output_store.setPassword('secret')
output_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
output_store.setGeowaveNamespace('geowave.joined')

gpx_store_plugin = gpx_store.createPluginOptions()
gdelt_store_plugin = gdelt_store.createPluginOptions()
output_store_plugin = output_store.createPluginOptions()

In [ ]:
#loading RDDs and setting up variables for join

In [ ]:
# Create SpatialJoinRunner object

# You have to pass the wrapped java SparkSession object to java functions
join_runner = join_runner_class(spark._jsparkSession)

# Set data for left side rdd in join
join_runner.setLeftStore(gpx_store_plugin)
gpx_point = byte_array_class('gpxpoint')
join_runner.setLeftAdapterId(gpx_point)

# Set data for right side rdd in join
join_runner.setRightStore(gdelt_store_plugin)
gdelt_event = byte_array_class('gdeltevent')
join_runner.setRightAdapterId(gdelt_event)

# Set data for output store
join_runner.setOutputStore(output_store_plugin)
join_runner.setOutputLeftAdapterId(byte_array_class('gpxJoin'))
join_runner.setOutputRightAdapterId(byte_array_class('gdeltJoin'))

# Set predicate method for join
distance_predicate = geom_distance_class(0.01)
join_runner.setPredicate(distance_predicate)

# Set default partition count for spark objects
join_runner.setPartCount(6000)

Run the spatial join

Execute the cell below to run the spatial join. This will compare 285 million gpx points against ~100k-~500k gdelt events. The smallest run case takes anywhere from 2-5 hours depending on dataset and cluster size. The work is split into 3 jobs, the first two determining which tiers contain data and the last performing the join between tiers.

This would be the equivalent of running the following sql command from the sql_context:

"select gpx.*, gdelt.* from gpx, gdelt where geomDistance(gpx.geom,gdelt.geom) <= 0.01"


In [ ]:
join_runner.run()

Create Map of join results

Once we have geoserver layers of our join results we can use folium to add the wms layers, and display the results on a map.


In [ ]:
%%bash

geowave config addstore gpx_joined --gwNamespace geowave.joined -t accumulo -i accumulo -u root -p secret --zookeeper $HOSTNAME:2181

# set up geoserver
geowave config geoserver "$HOSTNAME:8000"

# add the gpx join results layer
geowave gs addlayer gpx_joined -id gdeltJoin
geowave gs setls gdeltJoin --styleName geowave:blue

# add the gdelt join results layer
geowave gs addlayer gpx_joined -id gpxJoin
geowave gs setls gpxJoin --styleName point

In [ ]:
import owslib
from owslib.wms import WebMapService

url = "http://" + os.environ['HOSTNAME'] + ":8000/geoserver/geowave/wms"
web_map_services = WebMapService(url)

#print layers available wms
print('\n'.join(web_map_services.contents.keys()))

In [ ]:
import folium
from folium import Map

#grab wms info for centroids
layer = 'gdeltJoin'
wms = web_map_services.contents[layer]

#build center of map off centroid bbox
lon = (wms.boundingBox[0] + wms.boundingBox[2]) / 2.
lat = (wms.boundingBox[1] + wms.boundingBox[3]) / 2.
center = [lat, lon]

m = Map(location = center,zoom_start=10)


name = wms.title
gdelt = folium.raster_layers.WmsTileLayer(
    url=url,
    name=name,
    fmt='image/png',
    transparent=True,
    layers=layer,
    overlay=True,
    COLORSCALERANGE='1.2,28',
)
gdelt.add_to(m)

layer = 'gpxJoin'
wms = web_map_services.contents[layer]

name = wms.title
gpx = folium.raster_layers.WmsTileLayer(
    url=url,
    name=name,
    fmt='image/png',
    transparent=True,
    layers=layer,
    overlay=True,
    COLORSCALERANGE='1.2,28',
)
gpx.add_to(m)

folium.LayerControl().add_to(m)
m

In [ ]: