(1) Streaming Meetups to IBM Cloudant using Spark

In this notebook we are going to utilize the IBM Apache Spark and IBM Cloudant Bluemix services to process and persist data from the Meetup rsvp stream. On the backend the IBM Apache Spark service will be using the the Spark Kernel. The Spark Kernel provides an interface that allows clients to interact with a Spark Cluster. Clients can send libraries and snippets of code that are interpreted and run against a Spark context. This notebook should be run on IBM Bluemix using the IBM Apache Spark and IBM Cloudant services available in the IBM Bluemix Catalog.

Prerequisites

  • IBM Bluemix account - Sign up for IBM Bluemix
  • IBM Spark service instance - Create an IBM Apache Spark Service. Note once the IBM Spark service is created you can then create a Notebook on top of the IBM Spark service.
  • IBM Cloudant service instance - Create a IBM Cloudant NoSQL DB Service. Note once the IBM Cloudant service is created you can then create a meetup_group database by launching the IBM Cloudant service, navigating to the databases tab within the dashboard, and selecting create database.
  • IBM Cloudant service credentials - Once in a notebook use the Data Source option on the right Palete to Add a data source. After the data source is configured and linked to the created notebook you can use the Insert to code link which will add metadata regarding the datasource to your notebook. You will want to keep track of the hostname, username, and password to be used for configuration in step (2) below.

1. Add Dependencies/Jars

In order to run this demonstration notebook we are using the cloudant-spark and scalawebsocket libraries. These scala dependencies/jars are added to our environment using the AddJar magic from the Spark Kernel, which adds the specified jars to the Spark Kernel and Spark cluster.

  • scalawebsocket - Used for streaming data
  • async-http-client - Used for streaming data
  • cloudant-spark - Allows us to perform Spark SQL queries against RDDs backed by IBM Cloudant
  • scalalogging-log4j - Logging mechanism

In [1]:
%AddJar http://central.maven.org/maven2/eu/piotrbuda/scalawebsocket_2.10/0.1.1/scalawebsocket_2.10-0.1.1.jar
%AddJar https://dl.dropboxusercontent.com/u/19043899/cloudant-spark.jar
%AddJar http://central.maven.org/maven2/com/ning/async-http-client/1.8.0/async-http-client-1.8.0.jar
%AddJar http://central.maven.org/maven2/com/typesafe/scalalogging-log4j_2.10/1.1.0/scalalogging-log4j_2.10-1.1.0.jar


Using cached version of scalawebsocket_2.10-0.1.1.jar
Using cached version of cloudant-spark.jar
Using cached version of async-http-client-1.8.0.jar
Using cached version of scalalogging-log4j_2.10-1.1.0.jar

2. Initialize Spark context with IBM Cloudant configurations

The IBM Bluemix Apache Spark service notebook comes with a Spark context ready to use, but we are going to have to modify this one to configure built in support for IBM Cloudant. Note for the demo purposes we are setting the Spark master to run in local mode, but by default the IBM Spark service will run in cluster mode. Update the HOST, USERNAME, and PASSWORD below with the credentials to connect to your IBM Bluemix Cloudant service which our demo depends on. You can get these credentials from the Palette on the right by clicking on the Data Source option. If your data source does not exist add it using the Add Source button or if it already does you can use the "Insert to code" button to add the information to the notebook.


In [2]:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

val conf = sc.getConf
conf.set("cloudant.host","HOST")
conf.set("cloudant.username", "USERNAME")
conf.set("cloudant.password","PASSWORD")

conf.setJars(ClassLoader.getSystemClassLoader.asInstanceOf[java.net.URLClassLoader].getURLs.map(_.toString).toSet.toSeq ++ kernel.interpreter.classLoader.asInstanceOf[java.net.URLClassLoader].getURLs.map(_.toString).toSeq)
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("spark.master","local[*]")
val scCloudant = new SparkContext(conf)
scCloudant.getConf.getAll.foreach(println)

3. Write to the IBM Cloudant Bluemix service

Using the cloudant-spark library we are able to seemlessly interact with our IBM Cloudant Bluemix Service meetup_group database through the abstraction of Spark Dataframes.


In [3]:
def writeToDatabse(databaseName: String, df: DataFrame) = {
    df.write.format("com.cloudant.spark").save(databaseName)
}

4. Create the WebSocketReceiver for our Streaming Context

First we must create a custom WebSocketReceiver that extends Receiver and implements the required onStart and onStop functions.


In [4]:
import org.apache.spark.storage.StorageLevel
import scalawebsocket.WebSocket
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.Logging

import org.json4s._
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._

case class MeetupEvent(event_id: String,event_name: Option[String],event_url: Option[String],time: Long)
case class MeetupGroupTopics(topic_name: Option[String],urlkey: Option[String])
case class MeetupGroup( group_id: Long, group_name: String,group_city: Option[String],group_country: Option[String], group_state: Option[String],  group_urlname: Option[String], group_lat: Option[String],group_lon: Option[String],group_topics: List[MeetupGroupTopics]) 
case class MeetupMember( member_id: Long, member_name: Option[String],other_services: Option[String],photo: Option[String])
case class MeetupVenue(venue_id: Long, venue_name: Option[String],lat: Option[String], lon: Option[String])
case class MeetupRsvp(rsvp_id: Long,response: String,guests: Int, mtime: Long, visibility : String, event: MeetupEvent, group: MeetupGroup, member: MeetupMember, venue: MeetupVenue)


class WebSocketReceiver(url: String, storageLevel: StorageLevel) extends Receiver[MeetupRsvp](storageLevel) with Logging {
  private var webSocket: WebSocket = _
  
  def onStart() {
    try{
      logInfo("Connecting to WebSocket: " + url)
      val newWebSocket = WebSocket().open(url).onTextMessage({ msg: String => parseJson(msg) })
      setWebSocket(newWebSocket)
      logInfo("Connected to: WebSocket" + url)
    } catch {
      case e: Exception => restart("Error starting WebSocket stream", e)
    }
  }

  def onStop() {
    setWebSocket(null)
    logInfo("WebSocket receiver stopped")
  }

  private def setWebSocket(newWebSocket: WebSocket) = synchronized {
    if (webSocket != null) {
      webSocket.shutdown()
    }
    webSocket = newWebSocket
  }

  private def parseJson(jsonStr: String): Unit =
  {
    try {
      implicit lazy val formats = DefaultFormats
      val json = parse(jsonStr)
      val rsvp = json.extract[MeetupRsvp]
      store(rsvp)
    } catch {
       case e: Exception => e.getMessage()
    }
  }
}

5. Persist the Meetup stream to IBM Cloudant

This function creates a streaming context that operates on a 10 second window. Note due to our implementation in the custom WebSocketReceiver we are able to transform the text content of the websocket to JSON and extract the MeetupRsvp from it. Then for each MeetupRsvp RDD in our stream we are able to filter the stream where the group_state equals Texas. Lastly we convert the MeetupRsvp to a dataframe and utilize our writeToDatabase function to persist the instance to IBM Cloudant. As this is a quick demo we set the timeout of the streaming context to be one minute so we can move onto our analysis of the data in IBM Cloudant.


In [5]:
def persistStream(conf: SparkConf) = {
    val ssc = new StreamingContext(conf, Seconds(10))
    val stream = ssc.receiverStream[MeetupRsvp](new WebSocketReceiver("ws://stream.meetup.com/2/rsvps", StorageLevel.MEMORY_ONLY_SER))
    stream.foreachRDD((rdd: RDD[MeetupRsvp], time: Time) => {
      val sqlContext = new SQLContext(rdd.sparkContext)  
      import sqlContext.implicits._
      val df = rdd.map(data => {
        data.group
      }).filter(_.group_state.getOrElse("").equals("TX")).toDF()
      if(df.collect().length > 0) {
        writeToDatabse("meetup_group", df)
        df.show()
      }
    })
    ssc.start()
    ssc.awaitTerminationOrTimeout(60000)
}

persistStream(conf)


Use dbName=meetup_group, indexName=null, jsonstore.rdd.partitions=5, jsonstore.rdd.maxInPartition=-1, jsonstore.rdd.minInPartition=10, jsonstore.rdd.requestTimeout=100000,jsonstore.rdd.concurrentSave=-1,jsonstore.rdd.bulkSize=1
+--------+--------------------+----------+-------------+-----------+--------------------+---------+---------+--------------------+
|group_id|          group_name|group_city|group_country|group_state|       group_urlname|group_lat|group_lon|        group_topics|
+--------+--------------------+----------+-------------+-----------+--------------------+---------+---------+--------------------+
| 3448252|Network After Wor...|    Austin|           us|         TX|NetworkAfterWork-...|    30.27|   -97.74|List([Professiona...|
+--------+--------------------+----------+-------------+-----------+--------------------+---------+---------+--------------------+

Out[5]:
false
Use dbName=meetup_group, indexName=null, jsonstore.rdd.partitions=5, jsonstore.rdd.maxInPartition=-1, jsonstore.rdd.minInPartition=10, jsonstore.rdd.requestTimeout=100000,jsonstore.rdd.concurrentSave=-1,jsonstore.rdd.bulkSize=1
+--------+--------------------+----------+-------------+-----------+----------------+---------+---------+--------------------+
|group_id|          group_name|group_city|group_country|group_state|   group_urlname|group_lat|group_lon|        group_topics|
+--------+--------------------+----------+-------------+-----------+----------------+---------+---------+--------------------+
| 2672242|OpenStack Austin ...|    Austin|           us|         TX|OpenStack-Austin|     30.4|   -97.75|List([Virtualizat...|
+--------+--------------------+----------+-------------+-----------+----------------+---------+---------+--------------------+

Use dbName=meetup_group, indexName=null, jsonstore.rdd.partitions=5, jsonstore.rdd.maxInPartition=-1, jsonstore.rdd.minInPartition=10, jsonstore.rdd.requestTimeout=100000,jsonstore.rdd.concurrentSave=-1,jsonstore.rdd.bulkSize=1
+--------+--------------------+----------+-------------+-----------+--------------------+---------+---------+--------------------+
|group_id|          group_name|group_city|group_country|group_state|       group_urlname|group_lat|group_lon|        group_topics|
+--------+--------------------+----------+-------------+-----------+--------------------+---------+---------+--------------------+
|11106032|Holistic & Energy...|Richardson|           us|         TX|Holistic-Energy-H...|    32.96|   -96.75|List([Alternative...|
+--------+--------------------+----------+-------------+-----------+--------------------+---------+---------+--------------------+

Use dbName=meetup_group, indexName=null, jsonstore.rdd.partitions=5, jsonstore.rdd.maxInPartition=-1, jsonstore.rdd.minInPartition=10, jsonstore.rdd.requestTimeout=100000,jsonstore.rdd.concurrentSave=-1,jsonstore.rdd.bulkSize=1
+--------+--------------------+----------+-------------+-----------+-------------+---------+---------+--------------------+
|group_id|          group_name|group_city|group_country|group_state|group_urlname|group_lat|group_lon|        group_topics|
+--------+--------------------+----------+-------------+-----------+-------------+---------+---------+--------------------+
| 3282852|Dallas Agile Lead...|    Irving|           us|         TX|   Dallas-ALN|    32.85|   -96.96|List([Self-Improv...|
+--------+--------------------+----------+-------------+-----------+-------------+---------+---------+--------------------+