This demo runs a distance join using an GPX dataset for Germany and the GDELT dataset. We use this demo to run a distance join using our tiered join algorithm on two large datasets to get what GPX points are within a certain distance to GDELT events.
To run this join on Spark using a naive Spark SQL query would take 20+ hours to possibly get a result. With this algorithm and GeoWaves tiered indexing strategy we can complete the same join in 2-5 hours depending on the cluster size and configuration. This algorithm is not the answer to every join situation however, for smaller dataset sizes that can fit into memory you are performing extra work by running this join in its current implementation. For those datasets using native Spark joins are still a better option.
The current implementation of this algorithm considers the worst case scenario for each dataset. This will be improved upon quickly over the next updates and releases. Currently, the algorithm will dynamically index each set even when the underlying indexing method for each rdd is the same. This requires a touch of all records in the dataset which can be avoided for a majority of joins where the indexing methods are the same between both sets.
Simply focus a cell and use [SHIFT + ENTER] to run the code.
Start by importing pixiedust which if all bootstrap and install steps were run correctly. You should see below for opening the pixiedust database successfully with no errors. Depending on the version of pixiedust that gets installed, it may ask you to update. If so, run this first cell.
In [ ]:
#!pip install --user --upgrade pixiedust
In [ ]:
#Stop old session
spark.stop()
It's important to pick a high enough parallelism to partition the data into small enough chunks to support the join. Relying on the default set by Spark for the cluster size when working with a extremely large set of data is recipe for OOM errors on the executor.
If you're having trouble finding the right parallelism try looking at the Spark history server and checking what your largest partition size is. Aim for a max partition size of ~64MB preferably smaller.
In [ ]:
#Create new session with adequate parallelism
spark = SparkSession.builder\
.config('spark.serializer','org.apache.spark.serializer.KryoSerializer')\
.config('spark.kryo.registrator', 'org.locationtech.geowave.analytic.spark.GeoWaveRegistrator')\
.config('spark.default.parallelism', '6000')\
.getOrCreate()
In [ ]:
print(spark.__dict__)
In [ ]:
sc = spark.sparkContext
import pixiedust
import geowave_pyspark
In [ ]:
pixiedust.enableJobMonitor()
In [ ]:
# Print Spark info and create sql_context
print('Spark Version: {0}'.format(sc.version))
print('Python Version: {0}'.format(sc.pythonVer))
print('Application Name: {0}'.format(sc.appName))
print('Application ID: {0}'.format(sc.applicationId))
print('Spark Master: {0}'.format( sc.master))
NOTE Depending on cluster size sometimes the copy can fail. This appears to be a race condition error with the copy command when downloading the files from s3. This may make the following import into acccumulo command fail. You can check the accumulo tables by looking at port 9995 of the emr cluster. There should be 5 tables after importing.
In [ ]:
%%bash
s3-dist-cp -D mapreduce.task.timeout=60000000 --src=s3://geowave-gpx-data/gpx --dest=hdfs://$HOSTNAME:8020/tmp/
In [ ]:
%%bash
/opt/accumulo/bin/accumulo shell -u root -p secret -e "importtable geowave.germany_gpx_SPATIAL_IDX /tmp/spatial"
/opt/accumulo/bin/accumulo shell -u root -p secret -e "importtable geowave.germany_gpx_GEOWAVE_METADATA /tmp/metadata"
In [ ]:
%%bash
# configure geowave connection params for store
geowave store add germany_gpx --gwNamespace geowave.germany_gpx -t accumulo -i accumulo -u root -p secret --zookeeper $HOSTNAME:2181
Download the gdelt data necessary to perform the join. You can either download the quickstart events which ends around ~120k features, or you can download all events from 2010 onward which is close to ~500k+ features. If you want the larger dataset run the cell below, but replace "TIME_REGEX" with "LARGER_TIME_REGEX"
In [ ]:
%%bash
cd /mnt/tmp
wget s3.amazonaws.com/geowave/latest/scripts/emr/quickstart/geowave-env.sh
source /mnt/tmp/geowave-env.sh
#setup a larger regex for every event after 2010
export LARGER_TIME_REGEX=201
mkdir gdelt
cd gdelt
wget http://data.gdeltproject.org/events/md5sums
for file in `cat md5sums | cut -d' ' -f3 | grep "^${TIME_REGEX}"` ; \
do wget http://data.gdeltproject.org/events/$file ; done
md5sum -c md5sums 2>&1 | grep "^${TIME_REGEX}"
In [ ]:
%%bash
# We have to source here again because bash runs in a separate sub process each cell.
source /mnt/tmp/geowave-env.sh
# clear old potential runs
geowave store clear gdelt
geowave store rm gdelt
# configure geowave connection params for accumulo stores "gdelt"
geowave store add gdelt --gwNamespace geowave.gdelt -t accumulo -i accumulo -u root -p secret --zookeeper $HOSTNAME:2181
# configure a spatial index
geowave index add gdelt gdeltspatial -t spatial --partitionStrategy round_robin --numPartitions $NUM_PARTITIONS
# run the ingest for a 10x10 deg bounding box over Europe
geowave ingest localtogw /mnt/tmp/gdelt gdelt gdeltspatial -f gdelt \
--gdelt.cql "BBOX(geometry, 0, 50, 10, 60)"
In [ ]:
#grab classes from jvm
hbase_options_class = sc._jvm.org.locationtech.geowave.datastore.hbase.cli.config.HBaseRequiredOptions
accumulo_options_class = sc._jvm.org.locationtech.geowave.datastore.accumulo.cli.config.AccumuloRequiredOptions
query_options_class = sc._jvm.org.locationtech.geowave.core.store.query.QueryOptions
geowave_rdd_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDD
indexed_rdd_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveIndexedRDD
rdd_loader_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDDLoader
rdd_options_class = sc._jvm.org.locationtech.geowave.analytic.spark.RDDOptions
sf_df_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.SimpleFeatureDataFrame
byte_array_class = sc._jvm.org.locationtech.geowave.core.index.ByteArrayId
#grab classes for spatial join
join_runner_class = sc._jvm.org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner
index_builder_class = sc._jvm.org.locationtech.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider.SpatialIndexBuilder
geom_intersects_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.udf.GeomIntersects
geom_distance_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.udf.GeomWithinDistance
udf_registry_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.udf.GeomFunctionRegistry
feature_data_adapter_class = sc._jvm.org.locationtech.geowave.adapter.vector.FeatureDataAdapter
feature_data_utils = sc._jvm.org.locationtech.geowave.adapter.vector.util.FeatureDataUtils
sft_builder_class = sc._jvm.org.geotools.feature.simple.SimpleFeatureTypeBuilder
datastore_utils_class = sc._jvm.org.locationtech.geowave.core.store.util.DataStoreUtils
udf_registry_class.registerGeometryFunctions(spark._jsparkSession)
spatial_encoders_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.GeoWaveSpatialEncoders
spatial_encoders_class.registerUDTs()
In [ ]:
import os
#setup input datastore
gpx_store = accumulo_options_class()
gpx_store.setInstance('accumulo')
gpx_store.setUser('root')
gpx_store.setPassword('secret')
gpx_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
gpx_store.setGeowaveNamespace('geowave.germany_gpx')
#Setup osm datastore
gdelt_store = accumulo_options_class()
gdelt_store.setInstance('accumulo')
gdelt_store.setUser('root')
gdelt_store.setPassword('secret')
gdelt_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
gdelt_store.setGeowaveNamespace('geowave.gdelt')
#Setup output store
output_store = accumulo_options_class()
output_store.setInstance('accumulo')
output_store.setUser('root')
output_store.setPassword('secret')
output_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
output_store.setGeowaveNamespace('geowave.joined')
gpx_store_plugin = gpx_store.createPluginOptions()
gdelt_store_plugin = gdelt_store.createPluginOptions()
output_store_plugin = output_store.createPluginOptions()
In [ ]:
#loading RDDs and setting up variables for join
In [ ]:
# Create SpatialJoinRunner object
# You have to pass the wrapped java SparkSession object to java functions
join_runner = join_runner_class(spark._jsparkSession)
# Set data for left side rdd in join
join_runner.setLeftStore(gpx_store_plugin)
gpx_point = byte_array_class('gpxpoint')
join_runner.setLeftAdapterId(gpx_point)
# Set data for right side rdd in join
join_runner.setRightStore(gdelt_store_plugin)
gdelt_event = byte_array_class('gdeltevent')
join_runner.setRightAdapterId(gdelt_event)
# Set data for output store
join_runner.setOutputStore(output_store_plugin)
join_runner.setOutputLeftAdapterId(byte_array_class('gpxJoin'))
join_runner.setOutputRightAdapterId(byte_array_class('gdeltJoin'))
# Set predicate method for join
distance_predicate = geom_distance_class(0.01)
join_runner.setPredicate(distance_predicate)
# Set default partition count for spark objects
join_runner.setPartCount(6000)
Execute the cell below to run the spatial join. This will compare 285 million gpx points against ~100k-~500k gdelt events. The smallest run case takes anywhere from 2-5 hours depending on dataset and cluster size. The work is split into 3 jobs, the first two determining which tiers contain data and the last performing the join between tiers.
This would be the equivalent of running the following sql command from the sql_context:
"select gpx.*, gdelt.* from gpx, gdelt where geomDistance(gpx.geom,gdelt.geom) <= 0.01"
In [ ]:
join_runner.run()
In [ ]:
%%bash
geowave store add gpx_joined --gwNamespace geowave.joined -t accumulo -i accumulo -u root -p secret --zookeeper $HOSTNAME:2181
# set up geoserver
geowave config geoserver "$HOSTNAME:8000"
# add the gpx join results layer
geowave gs layer add gpx_joined -id gdeltJoin
geowave gs style set gdeltJoin --styleName geowave:blue
# add the gdelt join results layer
geowave gs layer add gpx_joined -id gpxJoin
geowave gs style set gpxJoin --styleName point
In [ ]:
import owslib
from owslib.wms import WebMapService
url = "http://" + os.environ['HOSTNAME'] + ":8000/geoserver/geowave/wms"
web_map_services = WebMapService(url)
#print layers available wms
print('\n'.join(web_map_services.contents.keys()))
In [ ]:
import folium
from folium import Map
#grab wms info for centroids
layer = 'gdeltJoin'
wms = web_map_services.contents[layer]
#build center of map off centroid bbox
lon = (wms.boundingBox[0] + wms.boundingBox[2]) / 2.
lat = (wms.boundingBox[1] + wms.boundingBox[3]) / 2.
center = [lat, lon]
m = Map(location = center,zoom_start=10)
name = wms.title
gdelt = folium.raster_layers.WmsTileLayer(
url=url,
name=name,
fmt='image/png',
transparent=True,
layers=layer,
overlay=True,
COLORSCALERANGE='1.2,28',
)
gdelt.add_to(m)
layer = 'gpxJoin'
wms = web_map_services.contents[layer]
name = wms.title
gpx = folium.raster_layers.WmsTileLayer(
url=url,
name=name,
fmt='image/png',
transparent=True,
layers=layer,
overlay=True,
COLORSCALERANGE='1.2,28',
)
gpx.add_to(m)
folium.LayerControl().add_to(m)
m
In [ ]: