The purpose of this notebook is to give an all-in-one demo of streaming data from the meetup.com RSVP API, through a local Spark Streaming job, and into declarative widgets in a dashboard layout. You can peek at the meetup.com stream here.
This example is a Scala adaptation of this notebook from jupyter_dashboards.
On your first visit to this notebook, we recommend that you execute one cell at a time as you read along. Later, if you just want to see the demo, select Cell > Run All from the menu bar. Once you've run all of the cells, select View > View Dashboard and then click on the Stream toggle to start the data stream.
Table of Contents
Note
We've condensed all of the demo logic into a single notebook for educational purposes. If you want to turn this into a scalable, multi-tenant dashboard, you'll want to separate the stream processing portions from the dashboard view. That way, multiple dashboard instances can pull from the same processed data stream instead of recomputing it.
In Toree, declarativewidgets need to be initialized by adding the JAR with the scala implementation and calling initWidgets
. This is must take place very close to the top of the notebook.
In [ ]:
%%html
<span id="__jar_url__"></span>
<script>
(function(){
var thisUrl = this.location.toString();
var jarUrl = thisUrl.substring(0,thisUrl.indexOf("/notebooks"))+"/nbextensions/urth_widgets/urth-widgets.jar";
document.getElementById("__jar_url__").innerHTML = jarUrl;
})();
</script>
Note The output of the cell above is the URL to use below.
In [ ]:
%AddJar http://localhost:8888/nbextensions/urth_widgets/urth-widgets.jar
In [ ]:
import urth.widgets._
initWidgets
With the frontend widgest in mind, we'll now setup our Spark Streaming job to fulfill their data requirements. In this section, we'll define a set of functions that act on a SparkStreamingContext
or RDDs
from that context.
In [ ]:
%AddDeps eu.piotrbuda scalawebsocket_2.10 0.1.1 --transitive
In [ ]:
import scalawebsocket.WebSocket
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class WebSocketReceiver(url: String, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER) extends Receiver[play.api.libs.json.JsValue](storageLevel) {
@volatile private var webSocket: WebSocket = _
def onStart() {
try{
val newWebSocket = WebSocket().open(url).onTextMessage({ msg: String => parseJson(msg) })
setWebSocket(newWebSocket)
} catch {
case e: Exception => restart("Error starting WebSocket stream", e)
}
}
def onStop() {
setWebSocket(null)
}
private def setWebSocket(newWebSocket: WebSocket) = synchronized {
if (webSocket != null) {
webSocket.shutdown()
}
webSocket = newWebSocket
}
private def parseJson(jsonStr: String): Unit = {
val json: play.api.libs.json.JsValue = play.api.libs.json.Json.parse(jsonStr)
store(json)
}
}
We then put all our functionality into an object
that is marked as Serializable
. This is a great technique to avoid serialization problems when interacting with Spark. Also not that anything that we do not want to serialize, such as the StreamingContext
and the SQLContext
should be marked @transient
.
The rest of the call are methods for starting and stopping the streaming application as well as functions that define the streaming flow.
In [ ]:
case class TopicCount(topic: String, count: Int)
object MeetupApp extends Serializable {
import play.api.libs.json._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.Logging
import org.apache.spark.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.rdd.RDD
import urth.widgets.WidgetChannels.channel
import org.apache.spark.sql.SQLContext
@transient var ssc:StreamingContext = _
@transient val sqlContext:SQLContext = new SQLContext(sc)
import sqlContext.implicits._
var topic_filter = ""
//Resetting the values on the data channels
channel("status").set("streaming", false)
channel("stats").set("topics", Map())
channel("stats").set("venues", List())
def create_streaming_context(sample_rate: Int): StreamingContext = {
return new StreamingContext(sc, Seconds(sample_rate))
}
/**
Creates a websocket client that pumps events into a ring buffer queue. Creates
a SparkStreamContext that reads from the queue. Creates the events, topics, and
venues DStreams, setting the widget channel publishing functions to iterate over
RDDs in each. Starts the stream processing.
*/
def start_stream():Unit = {
ssc = create_streaming_context(5)
ssc.checkpoint("/tmp/meetups.checkpoint")
val events = get_events(ssc, sample_event)
get_topics(events, get_topic_counts)
get_venues(events, aggregate_venues)
ssc.start()
channel("status").set("streaming", true)
}
/**
Shuts down the websocket, stops the streaming context, and cleans up the file ring.
*/
def shutdown_stream():Unit = {
ssc.stop(false)
channel("status").set("streaming", false)
}
/**
Parses the events from the queue. Retains only those events that have at
least one topic exactly matching the current topic_filter. Sends event
RDDs to the for_each function. Returns the event DStream.
*/
def get_events(ssc: StreamingContext, for_each: (RDD[play.api.libs.json.JsValue]) => Unit): org.apache.spark.streaming.dstream.DStream[play.api.libs.json.JsValue] = {
val all_events = ssc.receiverStream( new WebSocketReceiver("ws://stream.meetup.com/2/rsvps"))
// Filter set of event
val events = all_events.filter(retain_event)
// Send event data to a widget channel. This will be covered below.
events.foreachRDD(for_each)
return events
}
/**
Returns true if the user defined topic filter is blank or if at least one
group topic in the event exactly matches the user topic filter string.
*/
def retain_event(event: play.api.libs.json.JsValue):Boolean = {
val topics = (event \ "group" \ "group_topics").as[play.api.libs.json.JsArray].value
val isEmpty = topic_filter.trim == ""
val containsTopic = topics.map(topic => topic_filter.
equals((topic\"urlkey").as[play.api.libs.json.JsString].value)
).reduce( (a,b) => a || b)
isEmpty || containsTopic
}
/*
Takes an RDD from the event DStream. Takes one event from the RDD.
Substitutes a placeholder photo if the member who RSVPed does not
have one. Publishes the event metadata on the meetup channel.
*/
def sample_event(rdd: org.apache.spark.rdd.RDD[play.api.libs.json.JsValue]):Unit = {
try {
val event = rdd.take(1)(0)
// use a fallback photo for those members without one
val default_event: play.api.libs.json.JsObject = play.api.libs.json.Json.parse("""{
"member" : {
"photo" : "http://photos4.meetupstatic.com/img/noPhoto_50.png"
}
}
""").as[play.api.libs.json.JsObject]
val fixed_event = default_event ++ (event).as[play.api.libs.json.JsObject]
channel("meetups").set("meetup", fixed_event.value)
} catch {
case _ => print("No data to sample")
}
}
/**
Pulls group topics from meetup events. Counts each one once and updates
the global topic counts seen since stream start. Sends topic count RDDs
to the for_each function. Returns nothing new.
*/
def get_topics(events:DStream[JsValue], for_each: (RDD[((String,String),Int)]) => Unit) = {
//Extract the group topic url keys and "namespace" them with the current topic filter
val topics = events.
flatMap( (event: JsValue) => {
(event \ "group" \ "group_topics").as[JsArray].value
}).map((topic: JsValue) => {
val filter = if(topic_filter.equals("")) {
"*"
} else {
topic_filter
}
((filter, (topic \ "urlkey").as[JsString].value), 1)
})
val topic_counts = topics.updateStateByKey(update_topic_counts)
// Send topic data to a widget channel. This will be covered below.
topic_counts.foreachRDD(for_each)
}
/**
Sums the number of times a topic has been seen in the current sampling
window. Then adds that to the number of times the topic has been
seen in the past. Returns the new sum.
*/
def update_topic_counts(new_values: Seq[Int], last_sum: Option[Int]): Option[Int] = {
return Some((new_values :+ 0).reduce(_+_) + last_sum.getOrElse(0))
}
/**
Takes an RDD from the topic DStream. Takes the top 25 topics by occurrence
and publishes them in a pandas DataFrame on the counts channel.
*/
def get_topic_counts(rdd: RDD[((String,String),Int)]){
//counts = rdd.takeOrdered(25, key=lambda x: -x[1])
val filterStr = if (topic_filter.equals("")) "*" else topic_filter
/*
keep only those matching current filter
and sort in descending order, taking top 25
*/
val countDF = rdd.filter((x:((String,String),Int)) => filterStr.equals(x._1._1)).
map(tuple => TopicCount(tuple._1._2, tuple._2)).toDF().
sort($"count".desc).limit(25)
channel("stats").set("topics", countDF)
}
/**
Pulls venu metadata from meetup events if it exists. Sends venue
dictionaries RDDs to the for_each function. Returns nothing new.
*/
def get_venues(events:DStream[JsValue], calculate: (DStream[JsValue]) => Unit):Unit = {
val venues = events.
filter(hasVenue).
map((event:JsValue) => {event \ "venue"})
calculate(venues)
}
/**
Checks if there is a venue in the JSON object
*/
def hasVenue(event:JsValue):Boolean = {
(event \ "venue") match {
case e:play.api.libs.json.JsUndefined => false
case _ => true
}
}
/**
Aggregating the venues by lat/lon
*/
def aggregate_venues(venues:DStream[JsValue]): Unit = {
//build a (lat, long, mag) tuple
val venue_loc = venues.map((venue:JsValue) => {
(
(venue \ "lat").asOpt[Int].getOrElse(0),
(venue \ "lon").asOpt[Int].getOrElse(0)
)
}).
filter((latLon:(Int,Int)) => {latLon != (0,0)}).
map((latLon:(Int,Int)) => {
((""+latLon._1+","+latLon._2), (latLon._1, latLon._2, 1))
})
val venue_loc_counts = venue_loc.updateStateByKey(update_venue_loc_counts)
venue_loc_counts.map{
_._2
}.foreachRDD((rdd:RDD[(Int,Int,Int)]) => {
val venue_data = rdd.collect()
if( !venue_data.isEmpty ){
val total = venue_data.reduce((a,b)=> (0, 0, a._3+b._3))._3 * 1.0
val venue_data_list= venue_data.map( (x:(Int,Int,Int)) => List(x._1,x._2,x._3/total)).toList
channel("stats").set("venues", venue_data_list)
}
})
}
/**
Function to count the lat, lon
*/
def update_venue_loc_counts(new_values: Seq[(Int,Int,Int)], last_sum: Option[(Int,Int,Int)]): Option[(Int,Int,Int)] = {
if( new_values.isEmpty )
last_sum
else{
val partial = new_values.reduce((a,b)=> (a._1, a._2, a._3+b._3))
Some((partial._1, partial._2, partial._3 + last_sum.getOrElse((0,0,0))._3))
}
}
}
Now that we have a streaming job defined, we can create a series of interactive areas that can control and display the data is being produced.
In [ ]:
val start_stream = () => {
MeetupApp.start_stream()
}:Unit
val shutdown_stream = () => {
MeetupApp.shutdown_stream()
}:Unit
In [ ]:
%%html
<link rel="import" href="urth_components/paper-toggle-button/paper-toggle-button.html"
is="urth-core-import" package="PolymerElements/paper-toggle-button">
<template is="urth-core-bind" channel="status">
<urth-core-function id="start" ref="start_stream"></urth-core-function>
<urth-core-function id="shutdown" ref="shutdown_stream"></urth-core-function>
<paper-toggle-button checked="{{streaming}}" onChange="this.checked ? start.invoke() : shutdown.invoke()">Stream</paper-toggle-button>
</template>
<style is="custom-style">
paper-toggle-button {
--default-primary-color: green;
}
</style>
In [ ]:
%%html
<link rel="import" href="urth_components/urth-viz-chart/urth-viz-chart.html" is="urth-core-import">
<template is="urth-core-bind" channel="stats">
<urth-viz-chart type='bar' datarows='[[topics.data]]' columns='[[topics.columns]]' rotatelabels='30'></urth-viz-chart>
</template>
Next we create an <urth-core-function>
which that binds the value of a <paper-input>
widget to a Python function that sets a global variable. The function will set a string that we'll use to filter the incoming events to only pertaining to a certain topic.
Notice that the <link>
tag here is different than what we specified above. <urth-viz-chart>
is already loaded within the notebook, but here we are using a third-party Polymer element which needs to download first. To handle that automatically, we specify is="urth-core-import"
and set the bower package name as the package
attribute value.
In [ ]:
val set_topic_filter = (value: String) => {
MeetupApp.topic_filter = value
}
In [ ]:
%%html
<link rel="import" href="urth_components/paper-input/paper-input.html"
is="urth-core-import" package="PolymerElements/paper-input">
<template is="urth-core-bind" channel="filter" id="filter-input">
<urth-core-function auto
id="set_topic_filter"
ref="set_topic_filter"
arg-value="{{topic_filter}}">
</urth-core-function>
<paper-input label="Filter" value="{{topic_filter}}"></paper-input>
</template>
In [ ]:
%%html
<link rel="import" href="urth_components/paper-card/paper-card.html"
is="urth-core-import" package="PolymerElements/paper-card">
<style is="custom-style">
paper-card.meetups-card {
max-width: 400px;
width: 100%;
--paper-card-header: {
height: 100px;
border-bottom: 1px solid #e8e8e8;
};
--paper-card-header-image: {
height: 80px;
width: 80px !important;
margin: 10px;
border-radius: 50px;
width: auto;
border: 10px solid white;
box-shadow: 0 0 1px 1px #e8e8e8;
};
--paper-card-header-image-text: {
left: auto;
right: 0px;
width: calc(100% - 130px);
text-align: right;
text-overflow: ellipsis;
overflow: hidden;
};
}
.meetups-card .card-content a {
display: block;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
</style>
<template is="urth-core-bind" channel="meetups" id="meetup-card">
<paper-card
class="meetups-card"
heading="[[meetup.member.member_name]]"
image="[[meetup.member.photo]]">
<div class="card-content">
<p><a href="[[meetup.event.event_url]]" target="_blank">[[meetup.event.event_name]]</a></p>
<p>[[getPrettyTime(meetup.event.time)]]</p>
</div>
</paper-card>
</template>
<!-- see https://github.com/PolymerElements/iron-validator-behavior/blob/master/demo/index.html -->
<script>
(function() {
var dateStringOptions = {weekday:'long', year:'numeric', month: 'long', hour:'2-digit', minute:'2-digit', day:'numeric'};
var locale = navigator.language || navigator.browserLanguage || navigator.systemLanguage || navigator.userLanguage;
var scope = document.querySelector('template#meetup-card');
scope.getPrettyTime = function(timestamp) {
try {
console.log('The date is', timestamp)
var d = new Date(timestamp);
return d.toLocaleDateString(locale, dateStringOptions);
} catch(e){
return ''
}
}
})();
</script>
Finally, we add a WebGL globe showing the location of meetup venues to which users are RSVPing in the stream. On the globe we render bars to represent the number of recent RSVPs in a geographic area.
In [ ]:
%%html
<link rel="import" href="urth_components/webgl-globe/webgl-globe.html"
is="urth-core-import" package="http://github.com/ibm-et/webgl-globe.git">
<template is="urth-core-bind" channel="stats">
<webgl-globe data=[[venues]]></webgl-globe>
</template>
Before toggling the stream on/off switch, we should switch to dashboard view. Otherwise, we'll need to scroll up and down this notebook to see the widgets updating. For convenience, this notebook already contains metadata to position our widgets in a grid layout.
Select View > View Dashboard from the menu bar to see the dashboard view now. Then toggle the stream switch in the top right of the dashboard to begin stream processing. To return to the regular notebook view, select View > Notebook.
If you want to arrange the notebook cells differently, select View > Layout Dashboard. Then, hover your mouse over the main notebook / dashboard area. When you do, you'll see icons appear that allow you to:
Save the notebook to save your changes to the layout within the notebook file itself.
Note
in a fresh notebook, the dashboard will only show cells with non-empty output. All other cells can be found in the *Hidden* section at the bottom of the dashboard layout page. You can quickly add all cell outputs or remove all cell outputs from the dashboard using the show / hide icons that appear in the notebook toolbar when you are in layout mode.
In [ ]: