Start by importing pixiedust which if all bootstrap and install steps were run correctly. You should see below for opening the pixiedust database successfully with no errors. Depending on the version of pixiedust that gets installed, it may ask you to update. If so, run this first cell.
In [ ]:
#!pip install --user --upgrade pixiedust
In [ ]:
import pixiedust
import geowave_pyspark
Pixiedust also allows us to monitor spark job progress directly from the notebook. Simply run the cell below and anytime a spark job is run from the notebook you should see incremental progress shown in the output below. NOTE If this function fails or produces a error often this is just a link issue between pixiedust and python the first time pixiedust is imported. Restart the Kernel and rerun the cells to fix the error.
In [ ]:
pixiedust.enableJobMonitor()
In [ ]:
# Print Spark info and create sql_context
print('Spark Version: {0}'.format(sc.version))
print('Python Version: {0}'.format(sc.pythonVer))
print('Application Name: {0}'.format(sc.appName))
print('Application ID: {0}'.format(sc.applicationId))
print('Spark Master: {0}'.format( sc.master))
NOTE Depending on cluster size sometimes the copy can fail. This appears to be a race condition error with the copy command when downloading the files from s3. This may make the following import into acccumulo command fail. You can check the accumulo tables by looking at port 9995 of the emr cluster. There should be 5 tables after importing.
In [ ]:
%%bash
s3-dist-cp -D mapreduce.task.timeout=60000000 --src=s3://geowave-gpx-data/gpx --dest=hdfs://$HOSTNAME:8020/tmp/
In [ ]:
%%bash
/opt/accumulo/bin/accumulo shell -u root -p secret -e "importtable geowave.germany_gpx_SPATIAL_IDX /tmp/spatial"
/opt/accumulo/bin/accumulo shell -u root -p secret -e "importtable geowave.germany_gpx_GEOWAVE_METADATA /tmp/metadata"
In [ ]:
%%bash
# clear out potential old runs
geowave store clear kmeans_gpx
geowave store rm kmeans_gpx
geowave store clear germany_gpx_accumulo
geowave store rm germany_gpx_accumulo
# configure geowave connection params for name stores "germany_gpx_accumulo" and "kmeans_gpx"
geowave store add germany_gpx_accumulo --gwNamespace geowave.germany_gpx -t accumulo --zookeeper $HOSTNAME:2181 --instance accumulo --user root --password secret
geowave store add kmeans_gpx --gwNamespace geowave.kmeans -t accumulo --zookeeper $HOSTNAME:2181 --instance accumulo --user root --password secret
In [ ]:
%%bash
geowave store clear kmeans_gpx
In [ ]:
# Pull core GeoWave datastore classes
hbase_options_class = sc._jvm.org.locationtech.geowave.datastore.hbase.cli.config.HBaseRequiredOptions
accumulo_options_class = sc._jvm.org.locationtech.geowave.datastore.accumulo.cli.config.AccumuloRequiredOptions
query_options_class = sc._jvm.org.locationtech.geowave.core.store.query.QueryOptions
byte_array_class = sc._jvm.org.locationtech.geowave.core.index.ByteArrayId
# Pull core GeoWave Spark classes from jvm
geowave_rdd_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDD
rdd_loader_class = sc._jvm.org.locationtech.geowave.analytic.spark.GeoWaveRDDLoader
rdd_options_class = sc._jvm.org.locationtech.geowave.analytic.spark.RDDOptions
sf_df_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.SimpleFeatureDataFrame
kmeans_runner_class = sc._jvm.org.locationtech.geowave.analytic.spark.kmeans.KMeansRunner
datastore_utils_class = sc._jvm.org.locationtech.geowave.core.store.util.DataStoreUtils
spatial_encoders_class = sc._jvm.org.locationtech.geowave.analytic.spark.sparksql.GeoWaveSpatialEncoders
spatial_encoders_class.registerUDTs()
In [ ]:
#setup input datastore
input_store = accumulo_options_class()
input_store.setInstance('accumulo')
input_store.setUser('root')
input_store.setPassword('secret')
input_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
input_store.setGeowaveNamespace('geowave.germany_gpx')
#Setup output datastore
output_store = accumulo_options_class()
output_store.setInstance('accumulo')
output_store.setUser('root')
output_store.setPassword('secret')
output_store.setZookeeper(os.environ['HOSTNAME'] + ':2181')
output_store.setGeowaveNamespace('geowave.kmeans')
#Create a instance of the runner
kmeans_runner = kmeans_runner_class()
input_store_plugin = input_store.createPluginOptions()
output_store_plugin = output_store.createPluginOptions()
In [ ]:
#set the appropriate properties
#We want it to execute using the existing JavaSparkContext wrapped by python.
kmeans_runner.setSparkSession(sc._jsparkSession)
kmeans_runner.setAdapterId('gpxpoint')
kmeans_runner.setNumClusters(8)
kmeans_runner.setInputDataStore(input_store_plugin)
kmeans_runner.setOutputDataStore(output_store_plugin)
kmeans_runner.setCqlFilter("BBOX(geometry, 13.3, 52.45, 13.5, 52.5)")
kmeans_runner.setCentroidTypeName('mycentroids')
kmeans_runner.setHullTypeName('myhulls')
kmeans_runner.setGenerateHulls(True)
kmeans_runner.setComputeHullData(True)
#execute the kmeans runner
kmeans_runner.run()
In [ ]:
# Create the dataframe and get a rdd for the output of kmeans
# Grab adapter and setup query options for rdd load
adapter_id = byte_array_class('mycentroids')
query_adapter = datastore_utils_class.getDataAdapter(output_store_plugin, adapter_id)
query_options = query_options_class(query_adapter)
# Create RDDOptions for loader
rdd_options = rdd_options_class()
rdd_options.setQueryOptions(query_options)
output_rdd = rdd_loader_class.loadRDD(sc._jsc.sc(), output_store_plugin, rdd_options)
# Create a SimpleFeatureDataFrame from the GeoWaveRDD
sf_df = sf_df_class(spark._jsparkSession)
sf_df.init(output_store_plugin, adapter_id)
df = sf_df.getDataFrame(output_rdd)
# Convert Java DataFrame to Python DataFrame
import pyspark.mllib.common as convert
py_df = convert._java2py(sc, df)
py_df.createOrReplaceTempView('mycentroids')
df = spark.sql("select * from mycentroids")
display(df)
Using pixiedust's built in map visualization we can display data on a map assuming it has the following properties.
Also you will need a access token from whichever map renderer you choose to use with pixiedust (mapbox, google). Follow the instructions in the token help on how to create and use the access token.
In [ ]:
# Convert the string point information into lat long columns and create a new dataframe for those.
import pyspark
def parseRow(row):
lat=row.geom.y
lon=row.geom.x
return pyspark.sql.Row(lat=lat,lon=lon,ClusterIndex=row.ClusterIndex)
row_rdd = df.rdd
new_rdd = row_rdd.map(lambda row: parseRow(row))
new_df = new_rdd.toDF()
display(new_df)
If you have some more complex data to visualize pixiedust may not be the best option.
The Kmeans hull generation outputs polygons that would be difficult for pixiedust to display without creating a special plugin.
Instead, we can use another map renderer to visualize our data. For the Kmeans hulls we will use folium to visualize the data. Folium allows us to easily add wms layers to our notebook, and we can combine that with GeoWaves geoserver functionality to render the hulls and centroids.
In [ ]:
# Create the dataframe and get a rdd for the output of kmeans
# Grab adapter and setup query options for rdd load
adapter_id = byte_array_class('myhulls')
query_adapter = datastore_utils_class.getDataAdapter(output_store_plugin, adapter_id)
query_options = query_options_class(query_adapter)
# Use GeoWaveRDDLoader to load an RDD
rdd_options = rdd_options_class()
rdd_options.setQueryOptions(query_options)
output_rdd_hulls = rdd_loader_class.loadRDD(sc._jsc.sc(), output_store_plugin, rdd_options)
# Create a SimpleFeatureDataFrame from the GeoWaveRDD
sf_df_hulls = sf_df_class(spark._jsparkSession)
sf_df_hulls.init(output_store_plugin, adapter_id)
df_hulls = sf_df_hulls.getDataFrame(output_rdd_hulls)
# Convert Java DataFrame to Python DataFrame
import pyspark.mllib.common as convert
py_df_hulls = convert._java2py(sc, df_hulls)
# Create a sql table view of the hulls data
py_df_hulls.createOrReplaceTempView('myhulls')
# Run SQL Query on Hulls data
df_hulls = spark.sql("select * from myhulls order by Density")
display(df_hulls)
folium provides an easy way to visualize leaflet maps in jupyter notebooks. When the data is too complicated or big to work within the simple framework pixiedust provides for map display we can instead turn to geoserver and wms to render our layers. First we configure geoserver then setup wms layers for folium to display the kmeans results on the map.
In [ ]:
%%bash
# set up geoserver
geowave config geoserver "$HOSTNAME:8000"
# add the centroids layer
geowave gs layer add kmeans_gpx -id mycentroids
geowave gs style set mycentroids --styleName point
# add the hulls layer
geowave gs layer add kmeans_gpx -id myhulls
geowave gs style set myhulls --styleName line
In [ ]:
import owslib
from owslib.wms import WebMapService
url = "http://" + os.environ['HOSTNAME'] + ":8000/geoserver/geowave/wms"
web_map_services = WebMapService(url)
#print layers available wms
print('\n'.join(web_map_services.contents.keys()))
In [ ]:
import folium
#grab wms info for centroids
layer = 'mycentroids'
wms = web_map_services.contents[layer]
#build center of map off centroid bbox
lon = (wms.boundingBox[0] + wms.boundingBox[2]) / 2.
lat = (wms.boundingBox[1] + wms.boundingBox[3]) / 2.
center = [lat, lon]
m = folium.Map(location = center,zoom_start=10)
name = wms.title
centroids = folium.raster_layers.WmsTileLayer(
url=url,
name=name,
fmt='image/png',
transparent=True,
layers=layer,
overlay=True,
COLORSCALERANGE='1.2,28',
)
centroids.add_to(m)
layer = 'myhulls'
wms = web_map_services.contents[layer]
name = wms.title
hulls = folium.raster_layers.WmsTileLayer(
url=url,
name=name,
fmt='image/png',
transparent=True,
layers=layer,
overlay=True,
COLORSCALERANGE='1.2,28',
)
hulls.add_to(m)
m
In [ ]: