In this notebook we are going to utilize the IBM Apache Spark and IBM Cloudant Bluemix services to process and persist data from the Meetup rsvp stream. On the backend the IBM Apache Spark service will be using the the Spark Kernel. The Spark Kernel provides an interface that allows clients to interact with a Spark Cluster. Clients can send libraries and snippets of code that are interpreted and run against a Spark context. This notebook should be run on IBM Bluemix using the IBM Apache Spark and IBM Cloudant services available in the IBM Bluemix Catalog.
In order to run this demonstration notebook we are using the cloudant-spark and scalawebsocket libraries. These scala dependencies/jars are added to our environment using the AddJar magic from the Spark Kernel, which adds the specified jars to the Spark Kernel and Spark cluster.
In [1]:
%AddJar http://central.maven.org/maven2/eu/piotrbuda/scalawebsocket_2.10/0.1.1/scalawebsocket_2.10-0.1.1.jar
%AddJar https://dl.dropboxusercontent.com/u/19043899/cloudant-spark.jar
%AddJar http://central.maven.org/maven2/com/ning/async-http-client/1.8.0/async-http-client-1.8.0.jar
%AddJar http://central.maven.org/maven2/com/typesafe/scalalogging-log4j_2.10/1.1.0/scalalogging-log4j_2.10-1.1.0.jar
The IBM Bluemix Apache Spark service notebook comes with a Spark context ready to use, but we are going to have to modify this one to configure built in support for IBM Cloudant. Note for the demo purposes we are setting the Spark master to run in local mode, but by default the IBM Spark service will run in cluster mode. Update the HOST, USERNAME, and PASSWORD below with the credentials to connect to your IBM Bluemix Cloudant service which our demo depends on. You can get these credentials from the Palette on the right by clicking on the Data Source option. If your data source does not exist add it using the Add Source button or if it already does you can use the "Insert to code" button to add the information to the notebook.
In [2]:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
val conf = sc.getConf
conf.set("cloudant.host","HOST")
conf.set("cloudant.username", "USERNAME")
conf.set("cloudant.password","PASSWORD")
conf.setJars(ClassLoader.getSystemClassLoader.asInstanceOf[java.net.URLClassLoader].getURLs.map(_.toString).toSet.toSeq ++ kernel.interpreter.classLoader.asInstanceOf[java.net.URLClassLoader].getURLs.map(_.toString).toSeq)
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("spark.master","local[*]")
val scCloudant = new SparkContext(conf)
scCloudant.getConf.getAll.foreach(println)
In [3]:
def writeToDatabse(databaseName: String, df: DataFrame) = {
df.write.format("com.cloudant.spark").save(databaseName)
}
First we must create a custom WebSocketReceiver that extends Receiver and implements the required onStart and onStop functions.
In [4]:
import org.apache.spark.storage.StorageLevel
import scalawebsocket.WebSocket
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.Logging
import org.json4s._
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
case class MeetupEvent(event_id: String,event_name: Option[String],event_url: Option[String],time: Long)
case class MeetupGroupTopics(topic_name: Option[String],urlkey: Option[String])
case class MeetupGroup( group_id: Long, group_name: String,group_city: Option[String],group_country: Option[String], group_state: Option[String], group_urlname: Option[String], group_lat: Option[String],group_lon: Option[String],group_topics: List[MeetupGroupTopics])
case class MeetupMember( member_id: Long, member_name: Option[String],other_services: Option[String],photo: Option[String])
case class MeetupVenue(venue_id: Long, venue_name: Option[String],lat: Option[String], lon: Option[String])
case class MeetupRsvp(rsvp_id: Long,response: String,guests: Int, mtime: Long, visibility : String, event: MeetupEvent, group: MeetupGroup, member: MeetupMember, venue: MeetupVenue)
class WebSocketReceiver(url: String, storageLevel: StorageLevel) extends Receiver[MeetupRsvp](storageLevel) with Logging {
private var webSocket: WebSocket = _
def onStart() {
try{
logInfo("Connecting to WebSocket: " + url)
val newWebSocket = WebSocket().open(url).onTextMessage({ msg: String => parseJson(msg) })
setWebSocket(newWebSocket)
logInfo("Connected to: WebSocket" + url)
} catch {
case e: Exception => restart("Error starting WebSocket stream", e)
}
}
def onStop() {
setWebSocket(null)
logInfo("WebSocket receiver stopped")
}
private def setWebSocket(newWebSocket: WebSocket) = synchronized {
if (webSocket != null) {
webSocket.shutdown()
}
webSocket = newWebSocket
}
private def parseJson(jsonStr: String): Unit =
{
try {
implicit lazy val formats = DefaultFormats
val json = parse(jsonStr)
val rsvp = json.extract[MeetupRsvp]
store(rsvp)
} catch {
case e: Exception => e.getMessage()
}
}
}
This function creates a streaming context that operates on a 10 second window. Note due to our implementation in the custom WebSocketReceiver we are able to transform the text content of the websocket to JSON and extract the MeetupRsvp from it. Then for each MeetupRsvp RDD in our stream we are able to filter the stream where the group_state equals Texas. Lastly we convert the MeetupRsvp to a dataframe and utilize our writeToDatabase function to persist the instance to IBM Cloudant. As this is a quick demo we set the timeout of the streaming context to be one minute so we can move onto our analysis of the data in IBM Cloudant.
In [5]:
def persistStream(conf: SparkConf) = {
val ssc = new StreamingContext(conf, Seconds(10))
val stream = ssc.receiverStream[MeetupRsvp](new WebSocketReceiver("ws://stream.meetup.com/2/rsvps", StorageLevel.MEMORY_ONLY_SER))
stream.foreachRDD((rdd: RDD[MeetupRsvp], time: Time) => {
val sqlContext = new SQLContext(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(data => {
data.group
}).filter(_.group_state.getOrElse("").equals("TX")).toDF()
if(df.collect().length > 0) {
writeToDatabse("meetup_group", df)
df.show()
}
})
ssc.start()
ssc.awaitTerminationOrTimeout(60000)
}
persistStream(conf)
Out[5]: