This notebook shows how you can use Spark Streaming to acquire and process data from the RSVP API, then display it in an actionable dashboard using declarative widgets. In contrast with our other streaming demo, this notebook provides less tutorial instruction, but has a more realistic use case:
You have a product or service that you want to promote. One way of doing this is through local meetups. There are already some meetups affiliated with your offering, but you want to increase attendance. You could search for other meetups related to your own and message attendees. However, you worry about spamming people with irrelevant and untimely information.
Instead, you create a notebook that continuously identifies people actively RSVPing for meetups near to your own and with themes similar to yours. You also code this notebook to track conversions, people whom you contact that later RSVP for your meetup. You deploy this notebook as a dashboard and actively monitor it. You pick and choose who to contact from the identified attendees in a timely manner, while they're thinking about other meetups. You track successful conversions to see what contact strategies work and which do not.
On your first visit to this notebook, you'll need to at least put your API key in the configuration section. We recommend that you execute one cell at a time as you read along. Later, if you just want to see the demo, select Cell > Run All from the menu bar. Once you've run all of the cells, select View > View Dashboard and then click on the Stream toggle to start the data stream.
Table of Contents
We've condensed all of the demo logic into a single notebook for educational purposes. If you want to turn this into a scalable, multi-tenant dashboard, you'll want to separate the stream processing portions from the dashboard view. That way, multiple dashboard instances can pull from the same processed data stream instead of recomputing it.
In [ ]:
%matplotlib inline
In [ ]:
import shutil
import tempfile
import os
import time
import json
import sys
from tornado.websocket import websocket_connect
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from functools import reduce, partial
from urth.widgets.widget_channels import channel
The Meetup API requires authentication. You can get your API key here. Enter the key below:
In [ ]:
The topic on which to search for our upcoming meetups.
In [ ]:
PROD_TOPIC = 'bluemix'
How often (in seconds) to update the upcoming meetups list.
In [ ]:
To determine potential matches, define a relevant topic list.
In [ ]:
RELEVANT_TOPICS = ['paas', 'cloud', 'cloud-computing', 'cloud-foundry', 'devops', 'internet-of-things', 'data-analytics', 'saas-software-as-a-service', 'iaas-infrastructure-as-a-service', 'openstack', 'docker', 'nodejs']
Threshold (in km). Candidates farther away than this distance from one of the upcoming meetups are discarded:
In [ ]:
Span of time over which to calculate change in meetups RSVP counts.
In [ ]:
MU_CHANGE_CUTOFF = timedelta(weeks=1)
MU_CHANGE_LABEL = 'Change in last week'
For demo purposes, define this to True
. The notebook will randomly select to "contact" a candidate from the list.
In [ ]:
DEMO = False
The code in this section queries the meetup API for your upcoming meetups using a topic string of interest (e.g., bluemix
). These are the meetups at which you're trying to increase attendance. The code here polls the API open events endpoint on a long interval to show the increase / decrease of RSVPs at your meetups.
We don't take advantage of Spark streaming for these actions because the data is small and rarely changing.
In [ ]:
import requests
from html.parser import HTMLParser
import tornado.ioloop
In [ ]:
# list of Meetup Event records; result of search on PROD_TOPIC keywords
upcoming_meetups = []
# dict indexed by Meetup Event ID; each entry is a list of
# [datetime, total count of "yes" RSVPs, total count of conversions]
rsvp_count_history = {}
# list of tuples containing the "yes" RSVP counts for all of the
# upcoming meetups, as well as the total conversions
total_rsvp_count = []
In [ ]:
mu_events_url = ""
params = {'topic':PROD_TOPIC, 'key':API_KEY}
loop = tornado.ioloop.IOLoop.current()
In [ ]:
class MLStripper(HTMLParser):
def __init__(self):
self.fed = []
def handle_data(self, d):
def get_data(self):
return ''.join(self.fed)
def strip_tags(html):
s = MLStripper()
return s.get_data()
def get_venue(x):
if 'venue' not in x:
return '-'
v = x['venue']
city = v['city']
if v['country'] == 'us':
return '{}, {}'.format(city, v['state'])
return '{} ({})'.format(city, v['country'].upper())
def map_mus(x):
x['description'] = strip_tags(x['description']) if 'description' in x else ''
x['group']['url'] = "{}".format(x['group']['urlname'])
x['venue_loc'] = get_venue(x)
return x
def update_meetups_values():
global upcoming_meetups
global rsvp_count_history
global total_rsvp_count
now =
cutoff = now - MU_CHANGE_CUTOFF
for mu in upcoming_meetups:
id = mu['id']
prev_conv_count = 0 # running total for conversions
if id not in rsvp_count_history:
rsvp_count_history[id] = []
# carry previous conversions count forward
prev_conv_count = rsvp_count_history[id][-1][2]
# store the RSVP count for each event at this time
rsvp_count_history[id].append([now, mu['yes_rsvp_count'], prev_conv_count])
# cleanup old rsvp history values
rsvp_count_history[id] = list(filter(lambda x: x[0] > cutoff, rsvp_count_history[id]))
# save change in rsvp count over time span
mu['change_over_time'] = rsvp_count_history[id][-1][1] - rsvp_count_history[id][0][1]
# update the total number of RSVPs for all of upcoming meetups
reduce(lambda x, y: x + y['yes_rsvp_count'], upcoming_meetups, 0),
reduce(lambda x, y: x + y[-1][2], rsvp_count_history.values(), 0)
# cleanup old values
list(filter(lambda x: x[0] > cutoff, total_rsvp_count))
def refresh_meetups_list():
Update the data for the upcoming meetup events. Once invoked, will run every
PROD_TIMER seconds.
Publishes data on the 'meetups' and 'plots' channels.
global upcoming_meetups
# request updated meetups data
r = requests.get(mu_events_url, params=params)
upcoming_meetups = list(map(map_mus, r.json()['results']))
# update values, comparing old vs new data
# reschedule
loop.call_later(PROD_TIMER, refresh_meetups_list)
# broadcast the updated list
channel('meetups').set('list', upcoming_meetups)
fig = plot_total_rsvps(total_rsvp_count)
channel('plots').set('total_rsvp_plot', fig)
Here we define functions for identifying users that may also be interested in attending your meetups. The code here works relies on a Spark streaming context hooked to the RSVP stream. It selects candidates assocated with RSVPs that meet all of these conditions:
In [ ]:
conversions = [];
def process_rsvps(ssc, queue):
'''Initiates processing of the RSVPs stream.'''
msgs = ssc.textFileStream(queue)
# Each event is a JSON blob. Parse it. Filter it.
rsvps = json_str: json.loads(json_str))
In [ ]:
def process_candidates(rsvps):
Generate a list of candidates from the incoming stream, filtering on
relevant topics and distance.
filtered_rsvps = filter_rsvps(rsvps)
candidate_rsvps = compute_closeness(filtered_rsvps)
member_data = get_members(candidate_rsvps)
In [ ]:
def filter_rsvps(rsvps):
Filter out RSVPs that don't have our relevant topics,
contain PROD_TOPIC or are not "yes" responses.
return rsvps.filter(topic_filter)
def topic_filter(rsvp):
return (rsvp['response'] == 'yes' and
all(topic['urlkey'] != PROD_TOPIC for topic in rsvp['group']['group_topics']) and
any(topic['urlkey'] in RELEVANT_TOPICS for topic in rsvp['group']['group_topics']))
In [ ]:
# from
import math
def distance_on_unit_sphere(lat1, long1, lat2, long2):
'''Returns distance (in km) between two points on globe'''
# Convert latitude and longitude to
# spherical coordinates in radians.
degrees_to_radians = math.pi/180.0
# phi = 90 - latitude
phi1 = (90.0 - lat1)*degrees_to_radians
phi2 = (90.0 - lat2)*degrees_to_radians
# theta = longitude
theta1 = long1*degrees_to_radians
theta2 = long2*degrees_to_radians
# Compute spherical distance from spherical coordinates.
# For two locations in spherical coordinates
# (1, theta, phi) and (1, theta', phi')
# cosine( arc length ) =
# sin phi sin phi' cos(theta-theta') + cos phi cos phi'
# distance = rho * arc length
cos = (math.sin(phi1)*math.sin(phi2)*math.cos(theta1 - theta2) +
arc = math.acos( cos )
# Remember to multiply arc by the radius of the earth
# in your favorite set of units to get length.
#return arc
return arc * 6371 # radius earth = 6,371 km
In [ ]:
def compute_closeness(rsvps):
Compute how close the candidate is to our list of upcoming events. If they
are farther away than DIST_THRESHOLD, filter them out.
return (rsvps
.filter(lambda x: x['cmu_dist'] < DIST_THRESHOLD))
def add_closest_mu(rsvp):
# save closest upcoming meetup
cmu = None
cmu_dist = sys.maxsize
# loop through our upcoming events, looking for closest
for mu in upcoming_meetups:
m = mu['group']
r = rsvp['group']
dist = distance_on_unit_sphere(m['group_lat'], m['group_lon'], r['group_lat'], r['group_lon'])
if dist < cmu_dist:
cmu = mu
cmu_dist = dist
# store this data in the RSVP record
rsvp['cmu'] = cmu
rsvp['cmu_dist'] = cmu_dist
return rsvp
In [ ]:
def get_members(rsvps):
'''Create a member record from the RSVP record.'''
def map_rsvp_to_member(rsvp):
member = ({
'id': rsvp['member']['member_id'],
'member': rsvp['member'],
'cmu': ({
'id': rsvp['cmu']['id'],
'name': rsvp['cmu']['name'],
'event_url': rsvp['cmu']['event_url']
'cmu_dist': rsvp['cmu_dist']
# use a fallback photo for those members without one
if 'photo' not in member['member'] or member['member']['photo'] is None:
member['member']['photo'] = ''
return member
In [ ]:
def gen_candidate_list(rsvps):
'''Publish the candidates list (member records), which notifies the UI.'''
def update_cand_list(rsvps_rdd):
channel('candidates').set('cand_list', rsvps_rdd.collect())
We keep track of candidates that we have contacted. If we later see that such a candidate has made an RSVP for one of our upcoming events, we count this as a "conversion".
In [ ]:
def process_conversions(rsvps):
'''Check the RSVP stream for "converted" members, updating our running total.'''
conversion_stream = rsvps.filter(lambda x: x['response'] == 'yes' and
any(topic['urlkey'] == PROD_TOPIC for topic in x['group']['group_topics']) and
x['member']['member_id'] in conversions)
def rdd_update_conversions_count(rdd):
for rsvp in rdd.collect():
muid = rsvp['event']['event_id']
if muid in rsvp_count_history:
# increment conversions counter in *latest* entry
rsvp_count_history[muid][-1][2] += 1
# TODO what do we do if `rsvp_count_history` doesn't have `muid`?
In [ ]:
def candidate_contacted(id):
Callback which can be used by the UI to update the list of candidates
we have contacted.
Works in conjunction with the <template> which follows.
In [ ]:
<template is="urth-core-bind">
<urth-core-function id="candidateContacted" ref="candidate_contacted" arg-id="{{id}}"></urth-core-function>
In this section we import some basic widets and use them to define custom widgets for displaying meetup information in tabular form. These widgets could be defined outside the notebook and imported here to keep the notebook slimmer, but in the interest of having everything in one spot, they're done inline.
In [ ]:
<link rel="import" href="urth_components/iron-resizable-behavior/iron-resizable-behavior.html"
is="urth-core-import" package="PolymerElements/iron-resizable-behavior">
<link rel="import" href="urth_components/iron-list/iron-list.html"
is="urth-core-import" package="PolymerElements/iron-list">
<link rel="import" href="urth_components/paper-toggle-button/paper-toggle-button.html"
is="urth-core-import" package="PolymerElements/paper-toggle-button#v1.0.10">
<link rel="import" href="urth_components/paper-material/paper-material.html"
is="urth-core-import" package="PolymerElements/paper-material">
<link rel="import" href="./dynamic-list.html">
<link rel="import" href="./upcoming-meetups.html">
List local files here so they will be accessible when deployed:
In [ ]:
<template is="urth-core-bind" channel="meetups" id="meetups-list-tmpl">
<upcoming-meetups items="[[list]]" change-label="[[change_label]]"/>
In [ ]:
<template is="urth-core-bind" channel="meetups">
.topics-desc {
font-size: larger;
.topics-desc span {
font-style: italic;
<div class="topics-desc">
Showing upcoming meetups for the <span>[[upcoming_label]]</span> topic. Meetup candidates are generated using the following topic filters: <span>[[cand_topics_label]]</span>
In [ ]:
def plot_total_rsvps(list):
Create a stacked plot where the total is the number of RSVPs over a period of time,
and the top layer are "converted" RSVPs.
X = [i[0] for i in list]
Y = [[i[1] - i[2] for i in list], [i[2] for i in list]]
fig, ax = plt.subplots(1,1,figsize=(10,7))
ax.stackplot(X, *Y, baseline='zero', colors=['#2fa6e4','#82d1f5'])
ax.set_title('Total RSVPs for Upcoming Meetups')
return fig
In [ ]:
<template is="urth-core-bind" channel="plots">
<img src={{total_rsvp_plot}}>
We have to put code for configuring the frontend widgets after their definition because widgets cannot yet pull existing data off of channel when instantiated. When issue 35 is resolved, we can move this code up nearer to the configuration section.
In [ ]:
channel('meetups').set('change_label', MU_CHANGE_LABEL)
channel('meetups').set('upcoming_label', PROD_TOPIC)
channel('meetups').set('cand_topics_label', ', '.join(RELEVANT_TOPICS))
This widget creates a fancy table view showing candidates for your meetups. It can operate in one of two modes:
In [ ]:
<template is="urth-core-bind" channel="candidates">
<dynamic-list items="[[cand_list]]" id-prop="id" heading="Meetup Candidates"></dynamic-list>
In [ ]:
def create_streaming_context(checkpoint_dir, sample_rate):
Creates a new SparkContext and SparkStreamingContext. Done in a function
to allow repeated start/stop of the streaming. Returns the streaming
context instance.
:param checkpoint_dir: Directory to use to track Spark job state
:param sample_rate: Stream sampling rate in seconds
# create a local SparkContext to start using as many CPUs as we can
sc = SparkContext('local[*]')
# wrap it in a StreamingContext that collects from the stream
ssc = StreamingContext(sc, sample_rate)
# Setup a checkpoint directory to keep total counts over time.
ssc.checkpoint(os.path.join(checkpoint_dir, '.checkpoint'))
return ssc
In [ ]:
class FileRingReceiver(object):
Hack around lack of custom DStream receivers in Python:
Create a ring buffer of UTF-8 text files on disk.
def __init__(self, max_batches=10):
self.queue = tempfile.mkdtemp()
self.batch_count = 0
self.max_batches = max_batches
def __del__(self):
def put(self, text):
# ignore sentinels
if text is None: return
with open(os.path.join(self.queue, str(self.batch_count)), 'w', encoding='utf-8') as f:
if self.batch_count >= self.max_batches:
oldest = str(self.batch_count - self.max_batches)
os.remove(os.path.join(self.queue, str(oldest)))
self.batch_count += 1
def destroy(self):
shutil.rmtree(self.queue, ignore_errors=True)
In [ ]:
conn_future = None
ssc = None
receiver = None
In [ ]:
def start_stream():
Creates a websocket client that pumps events into a ring buffer queue. Creates
a SparkStreamContext that reads from the queue. Creates the events, topics, and
venues DStreams, setting the widget channel publishing functions to iterate over
RDDs in each. Starts the stream processing.
global conn_future
global ssc
global receiver
receiver = FileRingReceiver(max_batches=100)
conn_future = websocket_connect('ws://', on_message_callback=receiver.put)
ssc = create_streaming_context(receiver.queue, 5)
process_rsvps(ssc, receiver.queue)
def shutdown_stream():
Shuts down the websocket, stops the streaming context, and cleans up the file ring.
global conn_future
global ssc
global receiver
In [ ]:
<template is="urth-core-bind">
<urth-core-function id="streamFunc" ref="start_stream"></urth-core-function>
<urth-core-function id="shutdownFunc" ref="shutdown_stream"></urth-core-function>
<style is="custom-style">
paper-toggle-button {
--default-primary-color: green;
paper-toggle-button:hover {
cursor: pointer;
.toggle-btn-container {
margin: 1em 0;
text-align: right;
#stream-label {
font-size: larger;
margin: 0;
padding: 0 0.5em;
<div class="toggle-btn-container">
<paper-toggle-button id="stream-btn"></paper-toggle-button>
<label id="stream-label">Stream</label>
$('#stream-btn').on('change', function() {
if ($(this).attr('checked')) {
// start streaming
console.warn('Starting Spark Streaming');
} else {
// stop streaming
console.warn('Stopping Spark Streaming');
In order to run a standalone demo, the following code will (if DEMO
is set to True
) periodically simulate a mouse click on the contact button for a random candidate in the list. This will make it so that the rest of the code considers this as a "conversion". Over time, as "converted" members also sign up for one of the upcoming events in our list, the plot will show two different colors, the top one signifying the number of "converted" candidates.
In [ ]:
<template is="urth-core-bind" channel="demo" id="demo-tmpl">
var startDemoMode = function(e) {
if ( !== 'demo' || e.detail.key !== 'on') {
console.log('STARTING DEMO MODE');
setInterval(function() {
var list = document.querySelector('dynamic-list');
if (list) {
}, 60000);
demoTmpl.removeEventListener('channelSetItem', startDemoMode);
var demoTmpl = document.getElementById('demo-tmpl');
demoTmpl.register(demoTmpl, '*');
demoTmpl.addEventListener('channelSetItem', startDemoMode);
In [ ]:
if DEMO:
channel('demo').set('on', 1)