Introduction

This notebook demonstrates using GeoMesa FileSystem with Apache Spark in Scala, reading data stored in Azure Blob Storage.

In this fictional scenarios, we want to find all vessels which have been in proximity to our notional vessel of interest.


In [ ]:
// Imports we'll need
import org.apache.spark.sql.SQLTypes
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.locationtech.geomesa.spark.jts._

// Register all udfs
SQLTypes.init(spark.sqlContext) 

println(sc.version)

Load & filter data


In [ ]:
// Load overall AIS dataframe 
// TODO: replace appropriate values
val df = spark.read
  .format("geomesa")
  .option("fs.path","wasbs://<blob container name>@<storage account>.blob.core.windows.net/<path>")
  .option("geomesa.feature", "marinecadastre-ais-csv")
  .load()

In [ ]:
// Show the schema
df.printSchema()

In [ ]:
// Total number of records
df.count()

In [ ]:
%Truncation off

// Filter for ships within 50km of Galveston on 7th July 2017 and show the query plan
val lon = -95.013398
val lat = 29.2335042
val dist = 50000
val ships = df
    .where(st_within($"geom", st_bufferPoint(st_makePoint(lon, lat), dist)))  
    .where($"BaseDateTime" > from_utc_timestamp(lit("2017-07-07 00:00:00"), "Z"))  
    .where($"BaseDateTime" < from_utc_timestamp(lit("2017-07-07 23:59:59"), "Z"))
ships.explain(true)

In [ ]:
// Cache for performance 
ships.cache()
ships.count()

In [ ]:
%%dataframe --limit 1000
ships

Find Vessel of Interest

vessel


In [ ]:
// Filter for specific vessel of interest
val name = "YELLOW ROSE"
val interesting = ships.where($"VesselName" === name)
interesting.count()

In [ ]:
// Plot where vessel of interest has been

import org.locationtech.geomesa.jupyter._

val voi = L.DataFrameLayerPoint(interesting, "__fid__", L.StyleOptions("#000000", "#FF0000", 0.50))
val osm = L.WMSLayer("osm_auto:all", geoserverURL = "https://maps.heigit.org/osm-wms/service/")
val aoi = L.Circle(lon, lat, dist, L.StyleOptions("#000000", "#FFFF00", 0.15))

kernel.display.html(L.render(Seq(osm, aoi, voi), (lat, lon), 8))

Look for "suspicious" vessels


In [ ]:
// Quantise locations of voi using geohashes
val precision = 35
val interestingGhs = interesting
    .select(st_geoHash($"geom", precision).as("gh"), $"VesselName")
    

// Similarly, quantise locations of all ships
val shipsGhs = ships
    .select(st_geoHash($"geom", precision).as("gh"), $"VesselName")

// Count occurrences of ships in proximity to our voi
// NOTE: have to use .as(...) since they have the same schema so column names clash
// TODO: should also add a time criterion
val suspects = shipsGhs.repartition(20).as("ships")
    .join(broadcast(interestingGhs.as("interesting")))
    .where($"ships.gh" === $"interesting.gh")
    .where($"ships.VesselName" =!= $"interesting.VesselName")  // don't want to include self
    .groupBy($"ships.VesselName")
    .count()
    .orderBy(desc("count"))

// For more precision, use something like this:
//    .where(st_within($"ships.geom", st_bufferPoint($"interesting.geom", near)))
// or st_distanceSpheroid
// But need to look carefully at partitioning to avoid O(n^2) in the general case

In [ ]:
%%dataframe --limit 100
suspects

In [ ]: