0. Introduction

In this tutorial, we will use Spark Streaming to stream data from a Cloudant database, and process this continously received stream of data using Spark SQL.

Our processing pipeline goes through three stages:

  1. _changes feed is streamed from the given Cloudant database using CloudantReceiver. CloudantReceiver will receive _changes feed of the database, extract individual JSON documents from the feed, and store these documents in Spark's memory for processing by Spark Streaming.
  2. Spark Streaming will break up this continous stream of documents into batches. Each batch is a separate RDD, and in our case represents a set of documents collected within 10 secs window. This sequence of batches, or sequence of RDDs is what is called a discretized stream or DStream.
  3. Each RDD of the DStream is processed using Spark SQL.
|                1                  | -> |            2               |    |          3             |
|_changes feed ->... doc3 doc2 doc1 | -> |... [doc4 doc3] [doc2 doc1] | -> |... [pBatch2] [pBatch1] |
|      CloudantReceiver             | -> | Spark Streaming: DStream   | -> |      Spark SQL         |

In the steps below, we:

  1. Initialize StreamingContext and DStream
  2. Define processing of DStream using Spark SQL
  3. Actually start processing, and stop it after some time.

1. Initializing DStream

1.1 Provide the details of your cloudant account in properties map:

  • cloudant.host - the fully qualified account https URL
  • cloudant.username - the Cloudant account username
  • cloudant.password - the Cloudant account password
  • database - database which changes you want to recieve

In [ ]:
import org.apache.spark.streaming.{ Seconds, StreamingContext, Time }
import org.apache.spark.{ SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import java.util.concurrent.atomic.AtomicLong

import com.cloudant.spark.CloudantReceiver


val properties = Map(
    "cloudant.host"-> "xxx.cloudant.com", 
    "cloudant.username"-> "xxx",
    "cloudant.password"-> "xxx",
    "database"-> "election2016"
)

1.2 Initialize StreamingContext and DStream

  • Initialize a StreamingContext with a 10 seconds batch size
  • Create a DStream of database changes using CloudantReceiver

In [ ]:
val ssc = new StreamingContext(sc, Seconds(10))
val changesDStream = ssc.receiverStream(new CloudantReceiver(properties))

2. Define processing of DStreams

  • Get SQLContext
  • For every batch:
    • Create a dataframe tweetsDF
    • Create tweetsDF2 dataframe with fields gender, state, and polarity
    • Calculate and display the cumulative count of tweets

In [ ]:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val curTotalCount = new AtomicLong(0)
changesDStream.foreachRDD((rdd: RDD[String], time: Time) => {
    println(s"========= $time =========")
    val rawTweetsDF = sqlContext.read.json(rdd)
    
    if (!rawTweetsDF.schema.isEmpty) {
        rawTweetsDF.show(10)
        
        val tweetsDF = rawTweetsDF.select($"cde.author.gender", 
                $"cde.author.location.state",
                $"cde.content.sentiment.polarity")
        tweetsDF.show(10)
        
        curTotalCount.getAndAdd(tweetsDF.count())
        println("Current total count:" + curTotalCount)
    }
})

3. Start receiving and processing of data

  • Start StreamingContext
  • Allow processing to run for 300 secs
  • Manually stop processing

All previous instructions were just initilizations and definions, and nothing will happen until we start StreamingContext. After the start, the data will be received and processed. Since, DStream is continous, it will not stop until we manually stop the processing.

While this processing is running for 300 secs, we can go back to the Python notebook, and load more data to the database. These new changes will be picked up by Spark Streaming, proccessed and displayed below. Thus, the steps for demonstrating dynamic nature of Spark Streaming are following:

  1. Run the cell below of the current Scala notebook
  2. After that go back to the Python notebook, and run the following two cells of the Python notebook: Run:
    query = "#election2016"
    count = 300
    
    ant then run the following cell:
    TtC = TwitterToCloudant()
    TtC.count = count
    TtC.query_twitter(properties, None, query, 0)
    

Keep iterating through the load of tweets into the database and see how the stream below picks up the changes. Every 10 sec we fetch the next set of tweets available at that point. The analysis results will change appropriately and you can see for example different states showing up at the top of the list.


In [ ]:
ssc.start()
Thread.sleep(300000L)
ssc.stop(true)

In [ ]: