0. Introduction

In this tutorial, we will use Spark Streaming to stream data from a Cloudant database, and process this continously received stream of data using Spark SQL.

Our processing pipeline goes through three stages:

  1. _changes feed is streamed from the given Cloudant database using CloudantReceiver. CloudantReceiver will receive _changes feed of the database, extract individual JSON documents from the feed, and store these documents in Spark's memory for processing by Spark Streaming.
  2. Spark Streaming will break up this continous stream of documents into batches. Each batch is a separate RDD, and in our case represents a set of documents collected within 10 secs window. This sequence of batches, or sequence of RDDs is what is called a discretized stream or DStream.
  3. Each RDD of the DStream is processed using Spark SQL.
|                1                  | -> |            2               |    |          3             |
|_changes feed ->... doc3 doc2 doc1 | -> |... [doc4 doc3] [doc2 doc1] | -> |... [pBatch2] [pBatch1] |
|      CloudantReceiver             | -> | Spark Streaming: DStream   | -> |      Spark SQL         |

In the steps below, we:

  1. Initialize StreamingContext and DStream
  2. Define processing of DStream using Spark SQL
  3. Actually start processing, and stop it after some time.

1. Initializing DStream

1.1 Provide the details of your cloudant account in properties map:

  • cloudant.host - the fully qualified account https URL
  • cloudant.username - the Cloudant account username
  • cloudant.password - the Cloudant account password
  • database - database which changes you want to recieve

In [1]:
import org.apache.spark.streaming.{ Seconds, StreamingContext, Time }
import org.apache.spark.{ SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import java.util.concurrent.atomic.AtomicLong

import com.cloudant.spark.CloudantReceiver


val properties = Map(
    "cloudant.host"-> "xxx.cloudant.com", 
    "cloudant.username"-> "xxx",
    "cloudant.password"-> "xxx",
    "database"-> "election2016"
)

1.2 Initialize StreamingContext and DStream

  • Initialize a StreamingContext with a 10 seconds batch size
  • Create a DStream of database changes using CloudantReceiver

In [2]:
val ssc = new StreamingContext(sc, Seconds(10))
val changesDStream = ssc.receiverStream(new CloudantReceiver(properties))

2. Define processing of DStreams

  • Get SQLContext
  • For every batch:
    • Create a dataframe tweetsDF
    • Create tweetsDF2 dataframe with fields gender, state, and polarity
    • Calculate and display the cumulative count of tweets

In [3]:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val curTotalCount = new AtomicLong(0)
changesDStream.foreachRDD((rdd: RDD[String], time: Time) => {
    println(s"========= $time =========")
    val rawTweetsDF = sqlContext.read.json(rdd)
    
    if (!rawTweetsDF.schema.isEmpty) {
        rawTweetsDF.show(10)
        
        val tweetsDF = rawTweetsDF.select($"cde.author.gender", 
                $"cde.author.location.state",
                $"cde.content.sentiment.polarity")
        tweetsDF.show(10)
        
        curTotalCount.getAndAdd(tweetsDF.count())
        println("Current total count:" + curTotalCount)
    }
})

3. Start receiving and processing of data

  • Start StreamingContext
  • Allow processing to run for 300 secs
  • Manually stop processing

All previous instructions were just initilizations and definions, and nothing will happen until we start StreamingContext. After the start, the data will be received and processed. Since, DStream is continous, it will not stop until we manually stop the processing.

While this processing is running for 300 secs, we can go back to the Python notebook, and load more data to the database. These new changes will be picked up by Spark Streaming, proccessed and displayed below. Thus, the steps for demonstrating dynamic nature of Spark Streaming are following:

  1. Run the cell below of the current Scala notebook
  2. After that go back to the Python notebook, and run the following two cells of the Python notebook: Run:
    query = "#election2016"
    count = 300
    
    ant then run the following cell:
    TtC = TwitterToCloudant()
    TtC.count = count
    TtC.query_twitter(properties, None, query, 0)
    

In [4]:
ssc.start()
Thread.sleep(300000L)
ssc.stop(true)


========= 1473700520000 ms =========
+--------------------+--------------------+--------------------+--------------------+--------------------+
|                 _id|                _rev|                 cde|         cdeInternal|             message|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|19e10ed0d84ca4804...|1-e01013f3b419d3c...|[[unknown,[null,n...|[null,WrappedArra...|[[995mu,1041,94,5...|
|19e10ed0d84ca4804...|1-faa818605292480...|[[male,[Salt Lake...|[null,WrappedArra...|[[Daniel Burton,3...|
|19e10ed0d84ca4804...|1-ef96819a7a98d1b...|[[male,[Monticell...|[null,WrappedArra...|[[Steven Kurlande...|
|19e10ed0d84ca4804...|1-9c2f0a4b09ea675...|[[null,[,United S...|[null,WrappedArra...|[[AZ After Party,...|
|19e10ed0d84ca4804...|1-d8d702846ed578c...|[[male,[,,],[,unk...|[null,WrappedArra...|[[Mormon Democrat...|
|19e10ed0d84ca4804...|1-bb2f38a4ced7969...|[[unknown,[,Unite...|[null,WrappedArra...|[[utahpolitics,1,...|
|19e10ed0d84ca4804...|1-0a880a2c012457c...|[[unknown,[Adelai...|[null,WrappedArra...|[[Kerry Seebohm,9...|
|19e10ed0d84ca4804...|1-85dfdf787bc125e...|[[unknown,[Köln,G...|[null,WrappedArra...|[[Awale Howle,162...|
|19e10ed0d84ca4804...|1-643e24d6fbda555...|[[male,[BELLE,Uni...|[null,WrappedArra...|[[DR.BROWN-DEAN,3...|
|19e10ed0d84ca4804...|1-020f864315fe3d6...|[[male,[Elizabeth...|[[true,null,false...|[[Mr. Huesken,164...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 10 rows

+-------+---------------+--------+
| gender|          state|polarity|
+-------+---------------+--------+
|unknown|           null| NEUTRAL|
|   male|           Utah| NEUTRAL|
|   male|       New York|POSITIVE|
|   null|        Arizona|    null|
|   male|               | NEUTRAL|
|unknown|           Utah| NEUTRAL|
|unknown|South Australia|NEGATIVE|
|unknown|               |POSITIVE|
|   male|       Missouri|NEGATIVE|
|   male|   Pennsylvania| NEUTRAL|
+-------+---------------+--------+
only showing top 10 rows

Current total count:1200
========= 1473700530000 ms =========
========= 1473700540000 ms =========
========= 1473700550000 ms =========
+--------------------+--------------------+--------------------+--------------------+--------------------+
|                 _id|                _rev|                 cde|         cdeInternal|             message|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|39fb4703881bd46a4...|1-9f0eb0d3e8309ac...|[[unknown,[null,n...|[null,WrappedArra...|[[ekajoyce,179,25...|
|39fb4703881bd46a4...|1-d8d702846ed578c...|[[male,[,,],[,unk...|[null,WrappedArra...|[[Mormon Democrat...|
|39fb4703881bd46a4...|1-9c2f0a4b09ea675...|[[null,[,United S...|[null,WrappedArra...|[[AZ After Party,...|
|39fb4703881bd46a4...|1-bb2f38a4ced7969...|[[unknown,[,Unite...|[null,WrappedArra...|[[utahpolitics,1,...|
|39fb4703881bd46a4...|1-682d3282aa29a80...|[[unknown,[Accra,...|[null,WrappedArra...|[[Afoobu,218,621,...|
|39fb4703881bd46a4...|1-85dfdf787bc125e...|[[unknown,[Köln,G...|[null,WrappedArra...|[[Awale Howle,162...|
|39fb4703881bd46a4...|1-e01013f3b419d3c...|[[unknown,[null,n...|[null,WrappedArra...|[[995mu,1041,94,5...|
|39fb4703881bd46a4...|1-10c4d0c52932bee...|[[male,[,United S...|[[true,null,null,...|[[Chuck Nellis,33...|
|39fb4703881bd46a4...|1-7669e192db75074...|[[unknown,[NATION...|[null,WrappedArra...|[[Not On This Wat...|
|39fb4703881bd46a4...|1-faa818605292480...|[[male,[Salt Lake...|[null,WrappedArra...|[[Daniel Burton,3...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 10 rows

+-------+--------------+--------+
| gender|         state|polarity|
+-------+--------------+--------+
|unknown|          null| NEUTRAL|
|   male|              | NEUTRAL|
|   null|       Arizona|    null|
|unknown|          Utah| NEUTRAL|
|unknown|              | NEUTRAL|
|unknown|              |POSITIVE|
|unknown|          null| NEUTRAL|
|   male|North Carolina| NEUTRAL|
|unknown|      Maryland|NEGATIVE|
|   male|          Utah| NEUTRAL|
+-------+--------------+--------+
only showing top 10 rows

Current total count:1500
========= 1473700560000 ms =========
========= 1473700570000 ms =========
========= 1473700580000 ms =========
========= 1473700590000 ms =========
========= 1473700600000 ms =========
========= 1473700610000 ms =========
========= 1473700620000 ms =========
========= 1473700630000 ms =========
========= 1473700640000 ms =========
========= 1473700650000 ms =========
========= 1473700660000 ms =========
========= 1473700670000 ms =========
========= 1473700680000 ms =========
========= 1473700690000 ms =========
========= 1473700700000 ms =========
========= 1473700710000 ms =========
========= 1473700720000 ms =========
========= 1473700730000 ms =========
========= 1473700740000 ms =========
========= 1473700750000 ms =========
========= 1473700760000 ms =========
========= 1473700770000 ms =========
========= 1473700780000 ms =========
========= 1473700790000 ms =========
========= 1473700800000 ms =========
========= 1473700810000 ms =========

In [ ]: