Import pixiedust

Start by importing pixiedust which if all bootstrap and install steps were run correctly. You should see below for opening the pixiedust database successfully with no errors. Depending on the version of pixiedust that gets installed, it may ask you to update. If so, run this first cell.


In [ ]:
#!pip install --user --upgrade pixiedust

In [ ]:
import pixiedust
import geowave_pyspark

In [ ]:
pixiedust.enableJobMonitor()

Creating the SQLContext and inspecting pyspark Context

Pixiedust imports pyspark and the SparkContext + SparkSession should be already available through the "sc" and "spark" variables respectively.


In [ ]:
# Print Spark info and create sql_context
print('Spark Version: {0}'.format(sc.version))
print('Python Version: {0}'.format(sc.pythonVer))
print('Application Name: {0}'.format(sc.appName))
print('Application ID: {0}'.format(sc.applicationId))
print('Spark Master: {0}'.format( sc.master))

Download GDELT Data

Download the data necessary to perform Kmeans


In [ ]:
%%bash
cd /mnt/tmp
wget s3.amazonaws.com/geowave/latest/scripts/emr/quickstart/geowave-env.sh
source /mnt/tmp/geowave-env.sh
mkdir gdelt
cd gdelt
wget http://data.gdeltproject.org/events/md5sums
for file in `cat md5sums | cut -d' ' -f3 | grep "^${TIME_REGEX}"` ; \
do wget http://data.gdeltproject.org/events/$file ; done
md5sum -c md5sums 2>&1 | grep "^${TIME_REGEX}"

Create datastores and ingest gdelt data.

The ingest process may take a few minutes. If the '*' is present left of the cell the command is still running. Output will not appear below under the process is finished.


In [ ]:
%%bash

# We have to source here again because bash runs in a separate sub process each cell.
source /mnt/tmp/geowave-env.sh

# clear old potential runs
geowave store clear gdelt
geowave store rm gdelt
geowave store clear kmeans_gdelt
geowave store rm kmeans_gdelt

# configure geowave connection params for hbase stores "gdelt" and "kmeans"
geowave store add gdelt --gwNamespace geowave.gdelt -t hbase --zookeeper $HOSTNAME:2181
geowave store add kmeans_gdelt --gwNamespace geowave.kmeans -t hbase --zookeeper $HOSTNAME:2181

# configure a spatial index
geowave index add gelt gdeltspatial -t spatial --partitionStrategy round_robin --numPartitions $NUM_PARTITIONS

# run the ingest for a 10x10 deg bounding box over Europe
geowave ingest localtogw /mnt/tmp/gdelt gdelt gdeltspatial -f gdelt \
--gdelt.cql "BBOX(geometry, 0, 50, 10, 60)"

Run KMeans

Running the KMeans process may take a few minutes you should be able to track the progress of the task via the console or Spark History Server once the job begins.


In [ ]:
%%bash
# clear out potential old runs
geowave store clear kmeans_gdelt
# configure a spatial index
geowave index add kmeans_gdelt gdeltspatial -t spatial --partitionStrategy round_robin --numPartitions $NUM_PARTITIONS

In [ ]:
#grab classes from jvm

# Pull classes to desribe core GeoWave classes
hbase_options_class = sc._jvm.org.locationtech.geowave.datastore.hbase.cli.config.HBaseRequiredOptions
query_options_class = sc._jvm.org.locationtech.geowave.core.store.query.QueryOptions
byte_array_class = sc._jvm.org.locationtech.geowave.core.index.ByteArrayId
# Pull core GeoWave Spark classes from jvm
geowave_rdd_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDD
rdd_loader_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDDLoader
rdd_options_class = sc._jvm.org.locationtech.geowave.analytic.spark.RDDOptions
sf_df_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.SimpleFeatureDataFrame
kmeans_runner_class = sc._jvm.org.locationtech.geowave.analytic.spark.kmeans.KMeansRunner

datastore_utils_class = sc._jvm.org.locationtech.geowave.core.store.util.DataStoreUtils

spatial_encoders_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.GeoWaveSpatialEncoders

spatial_encoders_class.registerUDTs()

In [ ]:
#Setup input datastore options
input_store = hbase_options_class()
input_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
input_store.setGeowaveNamespace('geowave.gdelt')

#Setup output datastore options
output_store = hbase_options_class()
output_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
output_store.setGeowaveNamespace('geowave.kmeans')

#Create a instance of the runner, and datastore options
kmeans_runner = kmeans_runner_class()
input_store_plugin = input_store.createPluginOptions()
output_store_plugin = output_store.createPluginOptions()

In [ ]:
#Set the appropriate properties
kmeans_runner.setSparkSession(sc._jsparkSession)

kmeans_runner.setAdapterId('gdeltevent')
kmeans_runner.setInputDataStore(input_store_plugin)
kmeans_runner.setOutputDataStore(output_store_plugin)
kmeans_runner.setCqlFilter("BBOX(geometry, 0, 50, 10, 60)")
kmeans_runner.setCentroidTypeName('mycentroids_gdelt')
kmeans_runner.setHullTypeName('myhulls_gdelt')
kmeans_runner.setGenerateHulls(True)
kmeans_runner.setComputeHullData(True)
#Execute the kmeans runner
kmeans_runner.run()

Load resulting Centroids into DataFrame


In [ ]:
# Create the dataframe and get a rdd for the output of kmeans
adapter_id = byte_array_class('mycentroids_gdelt')
query_adapter = datastore_utils_class.getDataAdapter(output_store_plugin, adapter_id)
query_options = query_options_class(query_adapter)

# Create RDDOptions for loader
rdd_options = rdd_options_class()
rdd_options.setQueryOptions(query_options)
output_rdd = rdd_loader_class.loadRDD(sc._jsc.sc(), output_store_plugin, rdd_options)

# Create a SimpleFeatureDataFrame from the GeoWaveRDD
sf_df = sf_df_class(spark._jsparkSession)
sf_df.init(output_store_plugin, adapter_id)
df = sf_df.getDataFrame(output_rdd)

# Convert Java DataFrame to Python DataFrame
import pyspark.mllib.common as convert
py_df = convert._java2py(sc, df)

py_df.createOrReplaceTempView('mycentroids')

df = spark.sql("select * from mycentroids")

display(df)

Parse DataFrame data into lat/lon columns and display centroids on map

Using pixiedust's built in map visualization we can display data on a map assuming it has the following properties.

  • Keys: put your latitude and longitude fields here. They must be floating values. These fields must be named latitude, lat or y and longitude, lon or x.
  • Values: the field you want to use to thematically color the map. Only one field can be used.

Also you will need a access token from whichever map renderer you choose to use with pixiedust (mapbox, google). Follow the instructions in the token help on how to create and use the access token.


In [ ]:
# Convert the string point information into lat long columns and create a new dataframe for those.
import pyspark
def parseRow(row):
    lat=row.geom.y
    lon=row.geom.x
    return pyspark.sql.Row(lat=lat,lon=lon,ClusterIndex=row.ClusterIndex)
    
row_rdd = df.rdd

new_rdd = row_rdd.map(lambda row: parseRow(row))

new_df = new_rdd.toDF()
display(new_df)

Export KMeans Hulls to DataFrame

If you have some more complex data to visualize pixiedust may not be the best option.

The Kmeans hull generation outputs polygons that would be difficult for pixiedust to display without creating a special plugin.

Instead, we can use another map renderer to visualize our data. For the Kmeans hulls we will use folium to visualize the data. Folium allows us to easily add wms layers to our notebook, and we can combine that with GeoWaves geoserver functionality to render the hulls and centroids.


In [ ]:
# Create the dataframe and get a rdd for the output of kmeans
# Grab adapter and setup query options for rdd load
adapter_id = byte_array_class('myhulls_gdelt')
query_adapter = datastore_utils_class.getDataAdapter(output_store_plugin, adapter_id)
query_options = query_options_class(query_adapter)

# Use GeoWaveRDDLoader to load an RDD
rdd_options = rdd_options_class()
rdd_options.setQueryOptions(query_options)
output_rdd_hulls = rdd_loader_class.loadRDD(sc._jsc.sc(), output_store_plugin, rdd_options)

# Create a SimpleFeatureDataFrame from the GeoWaveRDD
sf_df_hulls = sf_df_class(spark._jsparkSession)
sf_df_hulls.init(output_store_plugin, adapter_id)
df_hulls = sf_df_hulls.getDataFrame(output_rdd_hulls)

# Convert Java DataFrame to Python DataFrame
import pyspark.mllib.common as convert
py_df_hulls = convert._java2py(sc, df_hulls)

# Create a sql table view of the hulls data
py_df_hulls.createOrReplaceTempView('myhulls')

# Run SQL Query on Hulls data
df_hulls = spark.sql("select * from myhulls order by Density")

display(df_hulls)

Visualize results using geoserver and wms

folium provides an easy way to visualize leaflet maps in jupyter notebooks. When the data is too complicated or big to work within the simple framework pixiedust provides for map display we can instead turn to geoserver and wms to render our layers. First we configure geoserver then setup wms layers for folium to display the kmeans results on the map.


In [ ]:
%%bash
# set up geoserver
geowave config geoserver "$HOSTNAME:8000"

# add the centroids layer
geowave gs layer add kmeans_gdelt -id mycentroids_gdelt
geowave gs style set mycentroids_gdelt --styleName point

# add the hulls layer
geowave gs layer add kmeans_gdelt -id myhulls_gdelt
geowave gs style set myhulls_gdelt --styleName line

In [ ]:
import owslib
from owslib.wms import WebMapService

url = "http://" + os.environ['HOSTNAME'] + ":8000/geoserver/geowave/wms"
web_map_services = WebMapService(url)

#print layers available wms
print('\n'.join(web_map_services.contents.keys()))

In [ ]:
import folium

#grab wms info for centroids
layer = 'mycentroids_gdelt'
wms = web_map_services.contents[layer]

#build center of map off centroid bbox
lon = (wms.boundingBox[0] + wms.boundingBox[2]) / 2.
lat = (wms.boundingBox[1] + wms.boundingBox[3]) / 2.
center = [lat, lon]

m = folium.Map(location = center,zoom_start=3)


name = wms.title
centroids = folium.raster_layers.WmsTileLayer(
    url=url,
    name=name,
    fmt='image/png',
    transparent=True,
    layers=layer,
    overlay=True,
    COLORSCALERANGE='1.2,28',
)
centroids.add_to(m)

layer = 'myhulls_gdelt'
wms = web_map_services.contents[layer]

name = wms.title
hulls = folium.raster_layers.WmsTileLayer(
    url=url,
    name=name,
    fmt='image/png',
    transparent=True,
    layers=layer,
    overlay=True,
    COLORSCALERANGE='1.2,28',
)
hulls.add_to(m)
m

In [ ]: