Connecting Apache Spark to Azure DocumentDB accelerates your ability to solve your fast moving Data Sciences problems where your data can be quickly persisted and retrieved using Azure DocumentDB. With the Spark to DocumentDB conector, you can more easily solve scenarios including (but not limited to) blazing fast IoT scenarios, update-able columns when performing analytics, push-down predicate filtering, and performing advanced analytics to data sciences against your fast changing data against a geo-replicated managed document store with guaranteed SLAs for consistency, availability, low latency, and throughput.
The Spark to DocumentDB connector utilizes the Azure DocumentDB Java SDK will utilize the following flow:
The data flow is as follows:
In [1]:
%%configure
{ "jars": ["wasb:///example/jars/azure-documentdb-1.10.0.jar","wasb:///example/jars/azure-documentdb-spark-0.0.2-SNAPSHOT.jar"],
"conf": {
"spark.jars.packages": "graphframes:graphframes:0.3.0-spark2.0-s_2.11",
"spark.jars.excludes": "org.scala-lang:scala-reflect"
}
}
In [2]:
// Import Spark to DocumentDB Connector
import com.microsoft.azure.documentdb.spark.schema._
import com.microsoft.azure.documentdb.spark._
import com.microsoft.azure.documentdb.spark.config.Config
// Connect to DocumentDB Database
val readConfig2 = Config(Map("Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "le1n99i1w5l7uvokJs3RT5ZAH8dc3ql7lx2CG0h0kK4lVWPkQnwpRLyAN0nwS1z4Cyd1lJgvGUfMWR3v8vkXKA==",
"Database" -> "DepartureDelays",
"preferredRegions" -> "Central US;East US 2;",
"Collection" -> "flights_pcoll",
"SamplingRatio" -> "1.0"))
In [3]:
// Create collection connection
val coll = spark.sqlContext.read.DocumentDB(readConfig2)
coll.createOrReplaceTempView("c")
In [4]:
// Run, get row count, and time query
val top100 = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA' LIMIT 100")
top100.createOrReplaceTempView("top100")
In [5]:
%%sql
select * from top100 limit 10
In [6]:
// Run, get row count, and time query
val originSEA = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'")
originSEA.createOrReplaceTempView("originSEA")
In [7]:
%%sql
select count(1) from originSEA
In [8]:
%%sql
select destination, sum(delay) as TotalDelays
from originSEA
group by destination
order by sum(delay) desc limit 10
In [9]:
%%sql
select distinct destination from originSEA order by destination limit 5
In [10]:
%%sql
select destination, sum(delay)
from originSEA
where delay < 0
group by destination
order by sum(delay) limit 5
In [11]:
%%sql
select destination, percentile_approx(delay, 0.5) as median_delay
from originSEA
where delay < 0
group by destination
order by percentile_approx(delay, 0.5)
In [12]:
// Run, get row count, and time query (filtering out test data)
val departureDelays = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin IN ('SEA', 'SFO', 'SJC', 'JFK', 'LAX', 'LAS', 'BOS', 'IAD', 'ORD', 'DFW', 'MSP', 'DTW', 'DEN', 'ATL')")
departureDelays.createOrReplaceTempView("departureDelays")
In [13]:
// Checking the number of rows (and cache)
departureDelays.cache()
departureDelays.count()
In [14]:
// Set File Paths
val airportsnaFilePath = "wasb://data@doctorwhostore.blob.core.windows.net/airport-codes-na.txt"
// Obtain airport information
val airportsna = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").option("delimiter", "\t").load(airportsnaFilePath)
airportsna.createOrReplaceTempView("airports_na")
// Available IATA codes from the departuredelays sample dataset
val tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")
// Only include airports with atleast one trip from the departureDelays dataset
val airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")
airports.cache()
// Build `departureDelays_geo` DataFrame
val departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination")
// Create Temporary View and cache
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")
departureDelays_geo.cache()
In [15]:
// Check number of flights
departureDelays_geo.count()
In [16]:
// import graphframes package
import org.graphframes._
// Create Vertices (airports) and Edges (flights)
val tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
val tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")
// Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()
// Build tripGraph GraphFrame
// This GraphFrame builds up on the vertices and edges based on our trips (flights)
val tripGraph = GraphFrame(tripVertices, tripEdges)
In [17]:
// Number of airports
tripGraph.vertices.count()
In [18]:
// Number of flights
tripGraph.edges.count()
In [19]:
val flightDelays = tripGraph.edges.filter("src = 'SEA' and delay > 0").groupBy("src", "dst").avg("delay").sort(desc("avg(delay)"))
flightDelays.createOrReplaceTempView("flightDelays")
In [20]:
%%sql
select * from flightDelays order by `avg(delay)` desc limit 10
In [21]:
val airportConnections = tripGraph.degrees.sort(desc("degree"))
airportConnections.createOrReplaceTempView("airportConnections")
In [22]:
%%sql
select id, degree from airportConnections order by degree desc limit 10
In [23]:
val filteredPaths = tripGraph.bfs.fromExpr("id = 'SEA'").toExpr("id = 'SJC'").maxPathLength(1).run()
filteredPaths.show()
In [24]:
val filteredPaths = tripGraph.bfs.fromExpr("id = 'SJC'").toExpr("id = 'ATL'").maxPathLength(1).run()
filteredPaths.show()
No direct flights, but how about one layover?
In [25]:
val filteredPaths = tripGraph.bfs.fromExpr("id = 'SFO'").toExpr("id = 'BUF'").maxPathLength(2).run()
filteredPaths.show()
In [26]:
val commonTransferPoint = filteredPaths.groupBy("v1.id", "v1.City").count().orderBy(desc("count"))
commonTransferPoint.createOrReplaceTempView("commonTransferPoint")
In [27]:
%%sql
select * from commonTransferPoint limit 10
In [ ]: