In [ ]:
// Imports we'll need
import org.apache.spark.sql.SQLTypes
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.locationtech.geomesa.spark.jts._
// Register all udfs
SQLTypes.init(spark.sqlContext)
println(sc.version)
In [ ]:
// Load overall AIS dataframe
// TODO: replace appropriate values
val df = spark.read
.format("geomesa")
.option("fs.path","wasbs://<blob container name>@<storage account>.blob.core.windows.net/<path>")
.option("geomesa.feature", "marinecadastre-ais-csv")
.load()
In [ ]:
// Show the schema
df.printSchema()
In [ ]:
// Total number of records
df.count()
In [ ]:
%Truncation off
// Filter for ships within 50km of Galveston on 7th July 2017 and show the query plan
val lon = -95.013398
val lat = 29.2335042
val dist = 50000
val ships = df
.where(st_within($"geom", st_bufferPoint(st_makePoint(lon, lat), dist)))
.where($"BaseDateTime" > from_utc_timestamp(lit("2017-07-07 00:00:00"), "Z"))
.where($"BaseDateTime" < from_utc_timestamp(lit("2017-07-07 23:59:59"), "Z"))
ships.explain(true)
In [ ]:
// Cache for performance
ships.cache()
ships.count()
In [ ]:
%%dataframe --limit 1000
ships
In [ ]:
// Filter for specific vessel of interest
val name = "YELLOW ROSE"
val interesting = ships.where($"VesselName" === name)
interesting.count()
In [ ]:
// Plot where vessel of interest has been
import org.locationtech.geomesa.jupyter._
val voi = L.DataFrameLayerPoint(interesting, "__fid__", L.StyleOptions("#000000", "#FF0000", 0.50))
val osm = L.WMSLayer("osm_auto:all", geoserverURL = "https://maps.heigit.org/osm-wms/service/")
val aoi = L.Circle(lon, lat, dist, L.StyleOptions("#000000", "#FFFF00", 0.15))
kernel.display.html(L.render(Seq(osm, aoi, voi), (lat, lon), 8))
In [ ]:
// Quantise locations of voi using geohashes
val precision = 35
val interestingGhs = interesting
.select(st_geoHash($"geom", precision).as("gh"), $"VesselName")
// Similarly, quantise locations of all ships
val shipsGhs = ships
.select(st_geoHash($"geom", precision).as("gh"), $"VesselName")
// Count occurrences of ships in proximity to our voi
// NOTE: have to use .as(...) since they have the same schema so column names clash
// TODO: should also add a time criterion
val suspects = shipsGhs.repartition(20).as("ships")
.join(broadcast(interestingGhs.as("interesting")))
.where($"ships.gh" === $"interesting.gh")
.where($"ships.VesselName" =!= $"interesting.VesselName") // don't want to include self
.groupBy($"ships.VesselName")
.count()
.orderBy(desc("count"))
// For more precision, use something like this:
// .where(st_within($"ships.geom", st_bufferPoint($"interesting.geom", near)))
// or st_distanceSpheroid
// But need to look carefully at partitioning to avoid O(n^2) in the general case
In [ ]:
%%dataframe --limit 100
suspects
In [ ]: