Streaming Meetups Dashboard

The purpose of this notebook is to give an all-in-one demo of streaming data from the meetup.com RSVP API, through a local Spark Streaming job, and into declarative widgets in a dashboard layout. You can peek at the meetup.com stream here.

This example is a Scala adaptation of this notebook from jupyter_dashboards.

On your first visit to this notebook, we recommend that you execute one cell at a time as you read along. Later, if you just want to see the demo, select Cell > Run All from the menu bar. Once you've run all of the cells, select View > View Dashboard and then click on the Stream toggle to start the data stream.

Table of Contents

  1. Initialize DeclarativeWidgets
  2. Define the Spark Streaming Job create stream context, custom receiver, filter by topic, top topics, venue metadata
  3. Create a Dashboard Interface with Widgetscharts, interactive controls, globe
  4. Arrange the Dashboard Layout

Initialize DeclarativeWidgetsTop

In Toree, declarativewidgets need to be initialized by adding the JAR with the scala implementation and calling initWidgets. This is must take place very close to the top of the notebook.


In [ ]:
%%html
<span id="__jar_url__"></span>
<script>
(function(){
    var thisUrl = this.location.toString();
    var jarUrl = thisUrl.substring(0,thisUrl.indexOf("/notebooks"))+"/nbextensions/urth_widgets/urth-widgets.jar";
    document.getElementById("__jar_url__").innerHTML = jarUrl;
})();
</script>

In [ ]:
%AddJar http://localhost:8888/nbextensions/urth_widgets/urth-widgets.jar

In [ ]:
import urth.widgets._
initWidgets

Define the Spark Streaming ApplicationTop

With the frontend widgest in mind, we'll now setup our Spark Streaming job to fulfill their data requirements. In this section, we'll define a set of functions that act on a SparkStreamingContext or RDDs from that context.

Install external dependencies


In [ ]:
%AddDeps eu.piotrbuda scalawebsocket_2.10 0.1.1 --transitive

Custom WebSocker Receiver

We create here a custom receiver that can connect to a WebSocket. That is how we will stream data out of the Meetup API.


In [ ]:
import scalawebsocket.WebSocket
import org.apache.spark.storage.StorageLevel    
import org.apache.spark.streaming.receiver.Receiver

class WebSocketReceiver(url: String, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER) extends Receiver[play.api.libs.json.JsValue](storageLevel) {
    @volatile private var webSocket: WebSocket = _

    def onStart() {
        try{
          val newWebSocket = WebSocket().open(url).onTextMessage({ msg: String => parseJson(msg) })
          setWebSocket(newWebSocket)
        } catch {
          case e: Exception => restart("Error starting WebSocket stream", e)
        }
    }

    def onStop() {
        setWebSocket(null)
    }

    private def setWebSocket(newWebSocket: WebSocket) = synchronized {
        if (webSocket != null) {
          webSocket.shutdown()
        }
        webSocket = newWebSocket
    }

    private def parseJson(jsonStr: String): Unit = {
        val json: play.api.libs.json.JsValue = play.api.libs.json.Json.parse(jsonStr)
        store(json)
    }
}

Spark Meetup Application

We then put all our functionality into an object that is marked as Serializable. This is a great technique to avoid serialization problems when interacting with Spark. Also not that anything that we do not want to serialize, such as the StreamingContext and the SQLContext should be marked @transient.

The rest of the call are methods for starting and stopping the streaming application as well as functions that define the streaming flow.


In [ ]:
case class TopicCount(topic: String, count: Int)

object MeetupApp extends Serializable {    
    import play.api.libs.json._
    import org.apache.spark.storage.StorageLevel    
    import org.apache.spark.Logging
    import org.apache.spark.streaming._
    import org.apache.spark.sql.functions._
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.rdd.RDD
    import urth.widgets.WidgetChannels.channel
    import org.apache.spark.sql.SQLContext
    
    @transient var ssc:StreamingContext = _
    @transient val sqlContext:SQLContext = new SQLContext(sc)
    
    import sqlContext.implicits._
    
    var topic_filter = ""
    
    //Resetting the values on the data channels
    channel("status").set("streaming", false)
    channel("stats").set("topics", Map())
    channel("stats").set("venues", List())

    def create_streaming_context(sample_rate: Int): StreamingContext = {
        return new StreamingContext(sc, Seconds(sample_rate))
    }
    
    /**
        Creates a websocket client that pumps events into a ring buffer queue. Creates
        a SparkStreamContext that reads from the queue. Creates the events, topics, and
        venues DStreams, setting the widget channel publishing functions to iterate over
        RDDs in each. Starts the stream processing.
    */
    def start_stream():Unit = {
        ssc = create_streaming_context(5)
        ssc.checkpoint("/tmp/meetups.checkpoint")
        val events = get_events(ssc, sample_event)
        get_topics(events, get_topic_counts)
        get_venues(events, aggregate_venues)
        ssc.start()
        
        channel("status").set("streaming", true)
    }

    /**
        Shuts down the websocket, stops the streaming context, and cleans up the file ring.
    */
    def shutdown_stream():Unit = {
        ssc.stop(false)
        channel("status").set("streaming", false)        
    }
    
    /**
    Parses the events from the queue. Retains only those events that have at
    least one topic exactly matching the current topic_filter. Sends event
    RDDs to the for_each function. Returns the event DStream.
    */
    def get_events(ssc: StreamingContext, for_each: (RDD[play.api.libs.json.JsValue]) => Unit): org.apache.spark.streaming.dstream.DStream[play.api.libs.json.JsValue] = {

        val  all_events = ssc.receiverStream( new WebSocketReceiver("ws://stream.meetup.com/2/rsvps"))

        // Filter set of event
        val events = all_events.filter(retain_event)

        // Send event data to a widget channel. This will be covered below.
        events.foreachRDD(for_each)

        return events
    }
    
    /**
    Returns true if the user defined topic filter is blank or if at least one
    group topic in the event exactly matches the user topic filter string.
    */
    def retain_event(event: play.api.libs.json.JsValue):Boolean = {
        val topics = (event \ "group" \ "group_topics").as[play.api.libs.json.JsArray].value
        val isEmpty = topic_filter.trim == ""
        val containsTopic =  topics.map(topic => topic_filter.
                                   equals((topic\"urlkey").as[play.api.libs.json.JsString].value)
                                ).reduce( (a,b) => a || b)

        isEmpty || containsTopic   
    }
    
    /*
    Takes an RDD from the event DStream. Takes one event from the RDD.
    Substitutes a placeholder photo if the member who RSVPed does not
    have one. Publishes the event metadata on the meetup channel.
    */
    def sample_event(rdd: org.apache.spark.rdd.RDD[play.api.libs.json.JsValue]):Unit = {
        
        try {
            val event = rdd.take(1)(0)

            // use a fallback photo for those members without one
            val default_event: play.api.libs.json.JsObject = play.api.libs.json.Json.parse("""{
                 "member" : {
                     "photo" : "http://photos4.meetupstatic.com/img/noPhoto_50.png"
                 }
             }
            """).as[play.api.libs.json.JsObject]
            val fixed_event = default_event ++ (event).as[play.api.libs.json.JsObject] 

            channel("meetups").set("meetup", fixed_event.value)
        } catch {
          case _ => print("No data to sample")
        }
    }
    
    /**
    Pulls group topics from meetup events. Counts each one once and updates
    the global topic counts seen since stream start. Sends topic count RDDs
    to the for_each function. Returns nothing new.
    */    
    def get_topics(events:DStream[JsValue], for_each: (RDD[((String,String),Int)]) => Unit) = {
        //Extract the group topic url keys and "namespace" them with the current topic filter
        val topics = events.
                        flatMap( (event: JsValue) => {
                          (event \ "group" \ "group_topics").as[JsArray].value
                        }).map((topic: JsValue) => {
                            val filter = if(topic_filter.equals("")) {
                                "*"
                            } else { 
                                topic_filter 
                            }
                            ((filter, (topic \ "urlkey").as[JsString].value), 1)      
                        })

        val topic_counts = topics.updateStateByKey(update_topic_counts)

        // Send topic data to a widget channel. This will be covered below.
        topic_counts.foreachRDD(for_each)
    }
    
    /**
    Sums the number of times a topic has been seen in the current sampling
    window. Then adds that to the number of times the topic has been
    seen in the past. Returns the new sum.
    */
    def update_topic_counts(new_values: Seq[Int], last_sum: Option[Int]): Option[Int] = {
        return Some((new_values :+ 0).reduce(_+_) + last_sum.getOrElse(0))
    }
        
    /**
    Takes an RDD from the topic DStream. Takes the top 25 topics by occurrence
    and publishes them in a pandas DataFrame on the counts channel.
    */
    def get_topic_counts(rdd: RDD[((String,String),Int)]){
        //counts = rdd.takeOrdered(25, key=lambda x: -x[1])
        val filterStr = if (topic_filter.equals("")) "*" else topic_filter
        
        /*
         keep only those matching current filter
         and sort in descending order, taking top 25
        */
        val countDF = rdd.filter((x:((String,String),Int)) => filterStr.equals(x._1._1)).
                    map(tuple => TopicCount(tuple._1._2, tuple._2)).toDF().
                    sort($"count".desc).limit(25)
                        
        channel("stats").set("topics", countDF)

    }
    
    /**
    Pulls venu metadata from meetup events if it exists. Sends venue 
    dictionaries RDDs to the for_each function. Returns nothing new.
    */
    def get_venues(events:DStream[JsValue], calculate: (DStream[JsValue]) => Unit):Unit = {
        
        val venues = events.
            filter(hasVenue).
            map((event:JsValue) => {event \ "venue"})

        calculate(venues)
    }
    
    /**
    Checks if there is a venue in the JSON object
    */
    def hasVenue(event:JsValue):Boolean = {
        (event \ "venue") match {
            case e:play.api.libs.json.JsUndefined => false
            case _ => true
        }
    }
    
    /**
    Aggregating the venues by lat/lon
    */
    def aggregate_venues(venues:DStream[JsValue]): Unit = {
        //build a (lat, long, mag) tuple
        val venue_loc = venues.map((venue:JsValue) => {
            ( 
                (venue \ "lat").asOpt[Int].getOrElse(0),
                (venue \ "lon").asOpt[Int].getOrElse(0)
            )
        }).
        filter((latLon:(Int,Int)) => {latLon != (0,0)}).
        map((latLon:(Int,Int)) => {
               ((""+latLon._1+","+latLon._2), (latLon._1, latLon._2, 1))
        })
        
        val venue_loc_counts = venue_loc.updateStateByKey(update_venue_loc_counts)
        
        venue_loc_counts.map{
            _._2
        }.foreachRDD((rdd:RDD[(Int,Int,Int)]) => {
            val venue_data = rdd.collect()
            
            if( !venue_data.isEmpty ){
                val total = venue_data.reduce((a,b)=> (0, 0, a._3+b._3))._3 * 1.0
                
                val venue_data_list= venue_data.map( (x:(Int,Int,Int)) => List(x._1,x._2,x._3/total)).toList
                
                channel("stats").set("venues", venue_data_list)
            }
        })
    }
    
    /**
    Function to count the lat, lon
    */
    def update_venue_loc_counts(new_values: Seq[(Int,Int,Int)], last_sum: Option[(Int,Int,Int)]): Option[(Int,Int,Int)] = {
        if( new_values.isEmpty )
            last_sum
        else{
            val partial = new_values.reduce((a,b)=> (a._1, a._2, a._3+b._3))
            Some((partial._1, partial._2, partial._3 + last_sum.getOrElse((0,0,0))._3))
        }
    }
}

Create a Dashboard Interface with Widgets Top

Now that we have a streaming job defined, we can create a series of interactive areas that can control and display the data is being produced.

Control to start and stop the Stream

Here we will use a switch widget and tie it to a function that can start and stop the stream job.


In [ ]:
val start_stream = () => {
    MeetupApp.start_stream()
}:Unit

val shutdown_stream = () => {
    MeetupApp.shutdown_stream()
}:Unit

In [ ]:
%%html
<link rel="import" href="urth_components/paper-toggle-button/paper-toggle-button.html"
    is="urth-core-import" package="PolymerElements/paper-toggle-button">
    
<template is="urth-core-bind" channel="status">
    <urth-core-function id="start" ref="start_stream"></urth-core-function>    
    <urth-core-function id="shutdown" ref="shutdown_stream"></urth-core-function>    
    <paper-toggle-button checked="{{streaming}}" onChange="this.checked ? start.invoke() : shutdown.invoke()">Stream</paper-toggle-button>
</template>

<style is="custom-style">
    paper-toggle-button {
        --default-primary-color: green;
    }
</style>

Topic Bar Chart

Here we insert a <urth-viz-chart> to show the top 25 meetup topics by occurrence in the stream. Take note of the <template> element. We use it to specify that the HTML within will make use of a counts channel. We will put data on the counts channel later in this notebook.


In [ ]:
%%html
<link rel="import" href="urth_components/urth-viz-chart/urth-viz-chart.html" is="urth-core-import">

<template is="urth-core-bind" channel="stats">
    <urth-viz-chart type='bar' datarows='[[topics.data]]' columns='[[topics.columns]]' rotatelabels='30'></urth-viz-chart>
</template>

Topic Filter

Next we create an <urth-core-function> which that binds the value of a <paper-input> widget to a Python function that sets a global variable. The function will set a string that we'll use to filter the incoming events to only pertaining to a certain topic.

Notice that the <link> tag here is different than what we specified above. <urth-viz-chart> is already loaded within the notebook, but here we are using a third-party Polymer element which needs to download first. To handle that automatically, we specify is="urth-core-import" and set the bower package name as the package attribute value.


In [ ]:
val set_topic_filter = (value: String) => {
    MeetupApp.topic_filter = value
}

In [ ]:
%%html
<link rel="import" href="urth_components/paper-input/paper-input.html"
    is="urth-core-import" package="PolymerElements/paper-input">
    
<template is="urth-core-bind" channel="filter" id="filter-input">
    <urth-core-function auto
        id="set_topic_filter"
        ref="set_topic_filter"
        arg-value="{{topic_filter}}">
    </urth-core-function>
        
    <paper-input label="Filter" value="{{topic_filter}}"></paper-input>
</template>

User Card

Now we add a simple <paper-card> element showing the name and photo of one user who RSVPed recently in the event stream. We add some custom styling and a bit of custom JavaScript in this case to format the datetime associated with the RSVP event.


In [ ]:
%%html
<link rel="import" href="urth_components/paper-card/paper-card.html"
    is="urth-core-import" package="PolymerElements/paper-card">

<style is="custom-style">
    paper-card.meetups-card {
        max-width: 400px;
        width: 100%;
        
        --paper-card-header: {
            height: 100px;
            border-bottom: 1px solid #e8e8e8;
        };

        --paper-card-header-image: {
            height: 80px;
            width: 80px !important;
            margin: 10px;
            border-radius: 50px;
            width: auto;
            border: 10px solid white;
            box-shadow: 0 0 1px 1px #e8e8e8;
        };
        
        --paper-card-header-image-text: {
            left: auto;
            right: 0px;
            width: calc(100% - 130px);
            text-align: right;
            text-overflow: ellipsis;
            overflow: hidden;
        };
    }
    
    .meetups-card .card-content a {
        display: block;
        overflow: hidden;
        text-overflow: ellipsis;
        white-space: nowrap;
    }
</style>

<template is="urth-core-bind" channel="meetups" id="meetup-card">
    <paper-card
            class="meetups-card"
            heading="[[meetup.member.member_name]]"
            image="[[meetup.member.photo]]">
        <div class="card-content">
            <p><a href="[[meetup.event.event_url]]" target="_blank">[[meetup.event.event_name]]</a></p>
            <p>[[getPrettyTime(meetup.event.time)]]</p>
        </div>
    </paper-card>
</template>

<!-- see https://github.com/PolymerElements/iron-validator-behavior/blob/master/demo/index.html -->
<script>
    (function() {
        var dateStringOptions = {weekday:'long', year:'numeric', month: 'long', hour:'2-digit', minute:'2-digit', day:'numeric'};
        var locale = navigator.language || navigator.browserLanguage || navigator.systemLanguage || navigator.userLanguage;

        var scope = document.querySelector('template#meetup-card');
        scope.getPrettyTime = function(timestamp) {
            try {
                console.log('The date is', timestamp)
                var d = new Date(timestamp);
                return d.toLocaleDateString(locale, dateStringOptions);
            } catch(e){
                return ''
            }
        }
    })();
</script>

Map Venues

Finally, we add a WebGL globe showing the location of meetup venues to which users are RSVPing in the stream. On the globe we render bars to represent the number of recent RSVPs in a geographic area.


In [ ]:
%%html
<link rel="import" href="urth_components/webgl-globe/webgl-globe.html"
  is="urth-core-import" package="http://github.com/ibm-et/webgl-globe.git">

<template is="urth-core-bind" channel="stats">
    <webgl-globe data=[[venues]]></webgl-globe>
</template>

Arrange the Dashboard Layout Top

Before toggling the stream on/off switch, we should switch to dashboard view. Otherwise, we'll need to scroll up and down this notebook to see the widgets updating. For convenience, this notebook already contains metadata to position our widgets in a grid layout.

Select View > View Dashboard from the menu bar to see the dashboard view now. Then toggle the stream switch in the top right of the dashboard to begin stream processing. To return to the regular notebook view, select View > Notebook.

If you want to arrange the notebook cells differently, select View > Layout Dashboard. Then, hover your mouse over the main notebook / dashboard area. When you do, you'll see icons appear that allow you to:

  • Drag cells to new locations
  • Resize cells
  • Show / hide cells in the dashboard view
  • Flip to editing mode for a cell

Save the notebook to save your changes to the layout within the notebook file itself.


In [ ]: