In this tutorial, we will use Spark Streaming to stream data from a Cloudant database, and process this continously received stream of data using Spark SQL.
Our processing pipeline goes through three stages:
CloudantReceiver
. CloudantReceiver
will receive _changes feed of the database, extract individual JSON documents from the feed, and store these documents in Spark's memory for processing by Spark Streaming. | 1 | -> | 2 | | 3 |
|_changes feed ->... doc3 doc2 doc1 | -> |... [doc4 doc3] [doc2 doc1] | -> |... [pBatch2] [pBatch1] |
| CloudantReceiver | -> | Spark Streaming: DStream | -> | Spark SQL |
In the steps below, we:
In [ ]:
import org.apache.spark.streaming.{ Seconds, StreamingContext, Time }
import org.apache.spark.{ SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import java.util.concurrent.atomic.AtomicLong
import com.cloudant.spark.CloudantReceiver
val properties = Map(
"cloudant.host"-> "xxx.cloudant.com",
"cloudant.username"-> "xxx",
"cloudant.password"-> "xxx",
"database"-> "election2016"
)
In [ ]:
val ssc = new StreamingContext(sc, Seconds(10))
val changesDStream = ssc.receiverStream(new CloudantReceiver(properties))
In [ ]:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val curTotalCount = new AtomicLong(0)
changesDStream.foreachRDD((rdd: RDD[String], time: Time) => {
println(s"========= $time =========")
val rawTweetsDF = sqlContext.read.json(rdd)
if (!rawTweetsDF.schema.isEmpty) {
rawTweetsDF.show(10)
val tweetsDF = rawTweetsDF.select($"cde.author.gender",
$"cde.author.location.state",
$"cde.content.sentiment.polarity")
tweetsDF.show(10)
curTotalCount.getAndAdd(tweetsDF.count())
println("Current total count:" + curTotalCount)
}
})
All previous instructions were just initilizations and definions, and nothing will happen until we start StreamingContext. After the start, the data will be received and processed. Since, DStream is continous, it will not stop until we manually stop the processing.
While this processing is running for 300 secs, we can go back to the Python notebook, and load more data to the database. These new changes will be picked up by Spark Streaming, proccessed and displayed below. Thus, the steps for demonstrating dynamic nature of Spark Streaming are following:
query = "#election2016"
count = 300
TtC = TwitterToCloudant()
TtC.count = count
TtC.query_twitter(properties, None, query, 0)
Keep iterating through the load of tweets into the database and see how the stream below picks up the changes. Every 10 sec we fetch the next set of tweets available at that point. The analysis results will change appropriately and you can see for example different states showing up at the top of the list.
In [ ]:
ssc.start()
Thread.sleep(300000L)
ssc.stop(true)
In [ ]: