Spark to DocumentDB Connector

Connecting Apache Spark to Azure DocumentDB accelerates your ability to solve your fast moving Data Sciences problems where your data can be quickly persisted and retrieved using Azure DocumentDB. With the Spark to DocumentDB conector, you can more easily solve scenarios including (but not limited to) blazing fast IoT scenarios, update-able columns when performing analytics, push-down predicate filtering, and performing advanced analytics to data sciences against your fast changing data against a geo-replicated managed document store with guaranteed SLAs for consistency, availability, low latency, and throughput.

The Spark to DocumentDB connector utilizes the Azure DocumentDB Java SDK will utilize the following flow:

The data flow is as follows:

  1. Connection is made from Spark master node to DocumentDB gateway node to obtain the partition map. Note, user only specifies Spark and DocumentDB connections, the fact that it connects to the respective master and gateway nodes is transparent to the user.
  2. This information is provided back to the Spark master node. At this point, we should be able to parse the query to determine which partitions (and their locations) within DocumentDB we need to access.
  3. This information is transmitted to the Spark worker nodes ...
  4. Thus allowing the Spark worker nodes to connect directly to the DocumentDB partitions directly to extract the data that is needed and bring the data back to the Spark partitions within the Spark worker nodes.

In [1]:
%%configure
{ "jars": ["wasb:///example/jars/azure-documentdb-1.10.0.jar","wasb:///example/jars/azure-documentdb-spark-0.0.2-SNAPSHOT.jar"],
  "conf": {
    "spark.jars.packages": "graphframes:graphframes:0.3.0-spark2.0-s_2.11",   
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}


Current session configs: {u'jars': [u'wasb:///example/jars/azure-documentdb-1.10.0.jar', u'wasb:///example/jars/azure-documentdb-spark-0.0.2-SNAPSHOT.jar'], u'kind': 'spark', u'conf': {u'spark.jars.packages': u'graphframes:graphframes:0.3.0-spark2.0-s_2.11', u'spark.jars.excludes': u'org.scala-lang:scala-reflect'}}
No active sessions.

In [2]:
// Import Spark to DocumentDB Connector
import com.microsoft.azure.documentdb.spark.schema._
import com.microsoft.azure.documentdb.spark._
import com.microsoft.azure.documentdb.spark.config.Config

// Connect to DocumentDB Database
val readConfig2 = Config(Map("Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "le1n99i1w5l7uvokJs3RT5ZAH8dc3ql7lx2CG0h0kK4lVWPkQnwpRLyAN0nwS1z4Cyd1lJgvGUfMWR3v8vkXKA==",
"Database" -> "DepartureDelays",
"preferredRegions" -> "Central US;East US 2;",
"Collection" -> "flights_pcoll", 
"SamplingRatio" -> "1.0"))


Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
2application_1491505321562_0020sparkidleLinkLink
SparkSession available as 'spark'.
readConfig2: com.microsoft.azure.documentdb.spark.config.Config = com.microsoft.azure.documentdb.spark.config.ConfigBuilder$$anon$1@4848afe

In [3]:
// Create collection connection 
val coll = spark.sqlContext.read.DocumentDB(readConfig2)
coll.createOrReplaceTempView("c")

Query 1: Flights departing from Seattle (Top 100)


In [4]:
// Run, get row count, and time query
val top100 = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA' LIMIT 100")
top100.createOrReplaceTempView("top100")

In [5]:
%%sql
select * from top100 limit 10


date delay distance origin destination
0 1011425 92 1495 SEA ORD
1 1010820 -10 1495 SEA ORD
2 1011205 -3 1495 SEA ORD
3 1010600 0 1495 SEA ORD
4 1021425 298 1495 SEA ORD
5 1020820 83 1495 SEA ORD
6 1021205 41 1495 SEA ORD
7 1020600 2 1495 SEA ORD
8 1031425 33 1495 SEA ORD
9 1030820 2 1495 SEA ORD

Query 2: Flights departing from Seattle


In [6]:
// Run, get row count, and time query
val originSEA = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'")
originSEA.createOrReplaceTempView("originSEA")

Determine the number of flights departing from Seattle (in this dataset)


In [7]:
%%sql
select count(1) from originSEA


count(1)
0 23078

Total delay grouped by destination

Not just counts but with Spark SQL and DocumentDB, can easily do GROUP BY


In [8]:
%%sql
select destination, sum(delay) as TotalDelays 
from originSEA 
group by destination 
order by sum(delay) desc limit 10


destination TotalDelays
0 SFO 22293
1 DEN 13645
2 ORD 10041
3 LAX 9359
4 LAS 8719
5 OAK 8461
6 PHX 8299
7 DFW 7167
8 IAH 6431
9 ANC 6316

Get distinct ordered destination airports departing from Seattle


In [9]:
%%sql
select distinct destination from originSEA order by destination limit 5


destination
0 ABQ
1 ANC
2 ATL
3 AUS
4 BOS

Top 5 delayed destination cities departing from Seattle (by Total Delay)


In [10]:
%%sql
select destination, sum(delay) 
from originSEA
where delay < 0 
group by destination 
order by sum(delay) limit 5


destination sum(delay)
0 LAX -4679
1 SFO -3733
2 PHX -3089
3 LAS -2999
4 PDX -2697

Calculate median delays by destination cities departing from Seattle


In [11]:
%%sql
select destination, percentile_approx(delay, 0.5) as median_delay 
from originSEA
where delay < 0 
group by destination 
order by percentile_approx(delay, 0.5)


destination median_delay
0 CVG -8.000000
1 JAC -7.500000
2 OMA -7.090909
3 HDN -7.000000
4 PDX -6.900000
5 FAT -6.454545
6 LGB -6.375000
7 SBA -6.333333
8 COS -5.900000
9 BUR -5.645161
10 SNA -5.568966
11 OGG -5.354167
12 DCA -5.119048
13 IAD -5.038462
14 MIA -4.875000
15 ONT -4.750000
16 MCI -4.714286
17 SAN -4.653226
18 PSP -4.650000
19 STL -4.636364
20 PHX -4.614035
21 KOA -4.562500
22 SJC -4.500000
23 KTN -4.500000
24 FLL -4.500000
25 IAH -4.486111
26 DFW -4.481818
27 LIH -4.473684
28 CLT -4.458333
29 SFO -4.452000
30 LAX -4.445355
31 JFK -4.411765
32 MSP -4.408333
33 OAK -4.400000
34 BOS -4.357143
35 LAS -4.338583
36 SLC -4.316901
37 GEG -4.209677
38 ORD -4.196721
39 DEN -4.186170
40 TUS -4.100000
41 FAI -4.068182
42 HNL -4.051282
43 EWR -3.979167
44 SAT -3.944444
45 AUS -3.944444
46 DTW -3.907407
47 SMF -3.902778
48 ANC -3.538095
49 PHL -3.478261
50 JNU -3.470588
51 MKE -3.450000
52 ATL -3.435897
53 MCO -3.406250
54 RNO -2.818182
55 ABQ -2.800000
56 MDW -2.571429
57 CLE -2.000000

Query 3: Access all data (~1.39M rows) or Access data from key airports (e.g. SEA, SFO, SJC, JFK, ATL, etc.)


In [12]:
// Run, get row count, and time query (filtering out test data)
val departureDelays = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin IN ('SEA', 'SFO', 'SJC', 'JFK', 'LAX', 'LAS', 'BOS', 'IAD', 'ORD', 'DFW', 'MSP', 'DTW', 'DEN', 'ATL')")
departureDelays.createOrReplaceTempView("departureDelays")

In [13]:
// Checking the number of rows (and cache)
departureDelays.cache()
departureDelays.count()


res20: Long = 547350

Using GraphFrames for Apache Spark to run motif queries

Using GraphFrames to build a graph against the flights data stored within DocumentDB.

Prepare the data


In [14]:
// Set File Paths
val airportsnaFilePath = "wasb://data@doctorwhostore.blob.core.windows.net/airport-codes-na.txt"

// Obtain airport information
val airportsna = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").option("delimiter", "\t").load(airportsnaFilePath)
airportsna.createOrReplaceTempView("airports_na")

// Available IATA codes from the departuredelays sample dataset
val tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")

// Only include airports with atleast one trip from the departureDelays dataset
val airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")
airports.cache()

// Build `departureDelays_geo` DataFrame
val departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 

// Create Temporary View and cache
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")
departureDelays_geo.cache()


res37: departureDelays_geo.type = [tripid: int, localdate: timestamp ... 8 more fields]

In [15]:
// Check number of flights
departureDelays_geo.count()


res39: Long = 541524

Build the graphFrame


In [16]:
// import graphframes package
import org.graphframes._

// Create Vertices (airports) and Edges (flights)
val tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
val tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")

// Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()

// Build tripGraph GraphFrame
//   This GraphFrame builds up on the vertices and edges based on our trips (flights)
val tripGraph = GraphFrame(tripVertices, tripEdges)


tripGraph: org.graphframes.GraphFrame = GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])

Determine the number of airports and trips


In [17]:
// Number of airports
tripGraph.vertices.count()


res51: Long = 262

In [18]:
// Number of flights
tripGraph.edges.count()


res53: Long = 541524

What flights departing SEA are most likely to have significant delays


In [19]:
val flightDelays = tripGraph.edges.filter("src = 'SEA' and delay > 0").groupBy("src", "dst").avg("delay").sort(desc("avg(delay)"))
flightDelays.createOrReplaceTempView("flightDelays")

In [20]:
%%sql
select * from flightDelays order by `avg(delay)` desc limit 10


src dst avg(delay)
0 SEA PHL 55.666667
1 SEA COS 43.538462
2 SEA FAT 43.038462
3 SEA LGB 39.397059
4 SEA IAD 37.733333
5 SEA MIA 37.325581
6 SEA SFO 36.502104
7 SEA SBA 36.482759
8 SEA JFK 35.031250
9 SEA ORD 33.603352

Which is the most important airport (in terms of connections)

Note this is from this filtered dataset


In [21]:
val airportConnections = tripGraph.degrees.sort(desc("degree"))
airportConnections.createOrReplaceTempView("airportConnections")

In [22]:
%%sql
select id, degree from airportConnections order by degree desc limit 10


id degree
0 ATL 102816
1 DFW 81487
2 ORD 77315
3 LAX 75984
4 DEN 66622
5 SFO 55732
6 LAS 46773
7 BOS 33805
8 SEA 33474
9 MSP 31527

Direct flights between Seattle and San Jose?


In [23]:
val filteredPaths = tripGraph.bfs.fromExpr("id = 'SEA'").toExpr("id = 'SJC'").maxPathLength(1).run()
filteredPaths.show()


+--------------------+--------------------+--------------------+
|                from|                  e0|                  to|
+--------------------+--------------------+--------------------+
|[SEA,Seattle,WA,USA]|[1010600,-2,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1012030,-4,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1011215,-6,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1011855,-3,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1010710,-1,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1020600,2,SEA,SJ...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1022030,-3,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1021600,-2,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1021215,-9,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1021855,-1,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1020710,-9,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1030600,-5,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1032030,-1,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1031600,-7,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1031215,-3,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1031855,-1,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1030710,-5,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1040600,4,SEA,SJ...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1042030,-2,SEA,S...|[SJC,San Jose,CA,...|
|[SEA,Seattle,WA,USA]|[1041215,-4,SEA,S...|[SJC,San Jose,CA,...|
+--------------------+--------------------+--------------------+
only showing top 20 rows

Direct flights between Buffalo and San Jose?


In [24]:
val filteredPaths = tripGraph.bfs.fromExpr("id = 'SJC'").toExpr("id = 'ATL'").maxPathLength(1).run()
filteredPaths.show()


+--------------------+--------------------+--------------------+
|                from|                  e0|                  to|
+--------------------+--------------------+--------------------+
|[SJC,San Jose,CA,...|[1012259,-9,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1022259,25,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1032259,15,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1042259,42,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1052259,40,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1062255,59,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1082255,47,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1092255,12,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1102255,7,SJC,AT...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1122255,23,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1132255,-7,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1152255,-4,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1162255,-9,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1172255,-7,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1192255,-5,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1202255,-4,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1222255,15,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1232255,19,SJC,A...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1242255,-11,SJC,...|[ATL,Atlanta,GA,USA]|
|[SJC,San Jose,CA,...|[1262255,-9,SJC,A...|[ATL,Atlanta,GA,USA]|
+--------------------+--------------------+--------------------+
only showing top 20 rows

No direct flights, but how about one layover?


In [25]:
val filteredPaths = tripGraph.bfs.fromExpr("id = 'SFO'").toExpr("id = 'BUF'").maxPathLength(2).run()
filteredPaths.show()


+--------------------+--------------------+-------------------+--------------------+--------------------+
|                from|                  e0|                 v1|                  e1|                  to|
+--------------------+--------------------+-------------------+--------------------+--------------------+
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1010635,-6,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1011059,13,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1011427,19,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1020635,-4,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1021059,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1021427,194,BOS,...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1030635,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1031059,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1031427,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1040635,16,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1041552,96,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1050635,1,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1051059,48,BOS,B...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1051427,443,BOS,...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1060635,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1061059,294,BOS,...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1061427,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1070730,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1071730,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
|[SFO,San Francisc...|[1010700,0,SFO,BO...|[BOS,Boston,MA,USA]|[1080710,0,BOS,BU...|[BUF,Buffalo,NY,USA]|
+--------------------+--------------------+-------------------+--------------------+--------------------+
only showing top 20 rows

What is the most common transfer point?


In [26]:
val commonTransferPoint = filteredPaths.groupBy("v1.id", "v1.City").count().orderBy(desc("count"))
commonTransferPoint.createOrReplaceTempView("commonTransferPoint")

In [27]:
%%sql
select * from commonTransferPoint limit 10


id City count
0 JFK New York 1233728
1 ORD Chicago 1088283
2 ATL Atlanta 285383
3 LAS Las Vegas 275091
4 BOS Boston 238576
5 IAD Washington DC 78200
6 DTW Detroit 17940
7 MSP Minneapolis 954

In [ ]: