Meetup Outreach

This notebook shows how you can use Spark Streaming to acquire and process data from the meetup.com RSVP API, then display it in an actionable dashboard using declarative widgets. In contrast with our other streaming demo, this notebook provides less tutorial instruction, but has a more realistic use case:

You have a product or service that you want to promote. One way of doing this is through local meetups. There are already some meetups affiliated with your offering, but you want to increase attendance. You could search meetup.com for other meetups related to your own and message attendees. However, you worry about spamming people with irrelevant and untimely information.

Instead, you create a notebook that continuously identifies people actively RSVPing for meetups near to your own and with themes similar to yours. You also code this notebook to track conversions, people whom you contact that later RSVP for your meetup. You deploy this notebook as a dashboard and actively monitor it. You pick and choose who to contact from the identified attendees in a timely manner, while they're thinking about other meetups. You track successful conversions to see what contact strategies work and which do not.

On your first visit to this notebook, you'll need to at least put your meetup.com API key in the configuration section. We recommend that you execute one cell at a time as you read along. Later, if you just want to see the demo, select Cell > Run All from the menu bar. Once you've run all of the cells, select View > View Dashboard and then click on the Stream toggle to start the data stream.

Table of Contents

  1. Configuration
  2. Upcoming Meetups
  3. RSVPs Stream
    1. Conversions
  4. Widgets
    1. Meetups List
    2. Dashboard Heading
    3. Total RSVPs Plot
    4. Candidates List
  5. Setup Streaming
  6. Demo Mode

In [ ]:
%matplotlib inline

In [ ]:
import shutil
import tempfile
import os
import time
import json
import sys
from tornado.websocket import websocket_connect
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from functools import reduce, partial

from urth.widgets.widget_channels import channel

Configuration

The Meetup API requires authentication. You can get your API key here. Enter the key below:


In [ ]:
API_KEY = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'

The topic on which to search for our upcoming meetups.


In [ ]:
PROD_TOPIC = 'bluemix'

How often (in seconds) to update the upcoming meetups list.


In [ ]:
PROD_TIMER = 3600

To determine potential matches, define a relevant topic list.


In [ ]:
RELEVANT_TOPICS = ['paas', 'cloud', 'cloud-computing', 'cloud-foundry', 'devops', 'internet-of-things', 'data-analytics', 'saas-software-as-a-service', 'iaas-infrastructure-as-a-service', 'openstack', 'docker', 'nodejs']

Threshold (in km). Candidates farther away than this distance from one of the upcoming meetups are discarded:


In [ ]:
DIST_THRESHOLD = 50

Span of time over which to calculate change in meetups RSVP counts.


In [ ]:
MU_CHANGE_CUTOFF = timedelta(weeks=1)
MU_CHANGE_LABEL = 'Change in last week'

For demo purposes, define this to True. The notebook will randomly select to "contact" a candidate from the list.


In [ ]:
DEMO = False

Upcoming Meetups

The code in this section queries the meetup API for your upcoming meetups using a topic string of interest (e.g., bluemix). These are the meetups at which you're trying to increase attendance. The code here polls the meetup.com API open events endpoint on a long interval to show the increase / decrease of RSVPs at your meetups.

We don't take advantage of Spark streaming for these actions because the data is small and rarely changing.


In [ ]:
import requests
from html.parser import HTMLParser
import tornado.ioloop

In [ ]:
# list of Meetup Event records; result of search on PROD_TOPIC keywords
upcoming_meetups = []

# dict indexed by Meetup Event ID; each entry is a list of
# [datetime, total count of "yes" RSVPs, total count of conversions]
rsvp_count_history = {}

# list of tuples containing the "yes" RSVP counts for all of the
# upcoming meetups, as well as the total conversions
total_rsvp_count = []

In [ ]:
mu_events_url = "https://api.meetup.com/2/open_events"
params = {'topic':PROD_TOPIC, 'key':API_KEY}
loop = tornado.ioloop.IOLoop.current()

In [ ]:
class MLStripper(HTMLParser):
    def __init__(self):
        super().__init__()
        self.reset()
        self.fed = []
    def handle_data(self, d):
        self.fed.append(d)
    def get_data(self):
        return ''.join(self.fed)

def strip_tags(html):
    s = MLStripper()
    s.feed(html)
    return s.get_data()

def get_venue(x):
    if 'venue' not in x:
        return '-'

    v = x['venue']
    city = v['city']
    if v['country'] == 'us':
        return '{}, {}'.format(city, v['state'])
    else:
        return '{} ({})'.format(city, v['country'].upper())

def map_mus(x):
    x['description'] = strip_tags(x['description']) if 'description' in x else ''
    x['group']['url'] = "http://meetup.com/{}".format(x['group']['urlname'])
    x['venue_loc'] = get_venue(x)
    return x

def update_meetups_values():
    global upcoming_meetups
    global rsvp_count_history
    global total_rsvp_count

    now = datetime.now()
    cutoff = now - MU_CHANGE_CUTOFF
    
    for mu in upcoming_meetups:
        id = mu['id']
        prev_conv_count = 0 # running total for conversions
        if id not in rsvp_count_history:
            rsvp_count_history[id] = []
        else:
            # carry previous conversions count forward
            prev_conv_count = rsvp_count_history[id][-1][2]

        # store the RSVP count for each event at this time
        rsvp_count_history[id].append([now, mu['yes_rsvp_count'], prev_conv_count])
        # cleanup old rsvp history values
        rsvp_count_history[id] = list(filter(lambda x: x[0] > cutoff, rsvp_count_history[id]))

        # save change in rsvp count over time span
        mu['change_over_time'] = rsvp_count_history[id][-1][1] - rsvp_count_history[id][0][1]

    # update the total number of RSVPs for all of upcoming meetups
    total_rsvp_count.append((now,
            reduce(lambda x, y: x + y['yes_rsvp_count'], upcoming_meetups, 0),
            reduce(lambda x, y: x + y[-1][2], rsvp_count_history.values(), 0)
    ))
    # cleanup old values
    list(filter(lambda x: x[0] > cutoff, total_rsvp_count))

def refresh_meetups_list():
    '''
    Update the data for the upcoming meetup events. Once invoked, will run every
    PROD_TIMER seconds.
    
    Publishes data on the 'meetups' and 'plots' channels.
    '''
    global upcoming_meetups

    # request updated meetups data
    r = requests.get(mu_events_url, params=params)
    upcoming_meetups = list(map(map_mus, r.json()['results']))
    # update values, comparing old vs new data
    update_meetups_values()
    # reschedule
    loop.call_later(PROD_TIMER, refresh_meetups_list)

    # broadcast the updated list
    channel('meetups').set('list', upcoming_meetups)
    fig = plot_total_rsvps(total_rsvp_count)
    channel('plots').set('total_rsvp_plot', fig)

RSVPs Stream

Here we define functions for identifying users that may also be interested in attending your meetups. The code here works relies on a Spark streaming context hooked to the meetup.com RSVP stream. It selects candidates assocated with RSVPs that meet all of these conditions:

  1. The RSVP is affirmative
  2. The RSVP has at least one of the RELEVANT_TOPICS
  3. The RSVP is to a venue within DIST_THRESHOLD of one of your meetup venues

In [ ]:
conversions = [];

def process_rsvps(ssc, queue):
    '''Initiates processing of the RSVPs stream.'''
    msgs = ssc.textFileStream(queue)
    
    # Each event is a JSON blob. Parse it. Filter it.
    rsvps = msgs.map(lambda json_str: json.loads(json_str))

    process_candidates(rsvps)
    process_conversions(rsvps)

In [ ]:
def process_candidates(rsvps):
    '''
    Generate a list of candidates from the incoming stream, filtering on
    relevant topics and distance.
    '''
    filtered_rsvps = filter_rsvps(rsvps)
    candidate_rsvps = compute_closeness(filtered_rsvps)
    member_data = get_members(candidate_rsvps)
    gen_candidate_list(member_data)

In [ ]:
def filter_rsvps(rsvps):
    '''
    Filter out RSVPs that don't have our relevant topics,
    contain PROD_TOPIC or are not "yes" responses.
    '''
    return rsvps.filter(topic_filter)

def topic_filter(rsvp):
    return (rsvp['response'] == 'yes' and
            all(topic['urlkey'] != PROD_TOPIC for topic in rsvp['group']['group_topics']) and
            any(topic['urlkey'] in RELEVANT_TOPICS for topic in rsvp['group']['group_topics']))

In [ ]:
# from http://www.johndcook.com/blog/python_longitude_latitude/
import math

def distance_on_unit_sphere(lat1, long1, lat2, long2):
    '''Returns distance (in km) between two points on globe'''
 
    # Convert latitude and longitude to 
    # spherical coordinates in radians.
    degrees_to_radians = math.pi/180.0
         
    # phi = 90 - latitude
    phi1 = (90.0 - lat1)*degrees_to_radians
    phi2 = (90.0 - lat2)*degrees_to_radians
         
    # theta = longitude
    theta1 = long1*degrees_to_radians
    theta2 = long2*degrees_to_radians
         
    # Compute spherical distance from spherical coordinates.
         
    # For two locations in spherical coordinates 
    # (1, theta, phi) and (1, theta', phi')
    # cosine( arc length ) = 
    #    sin phi sin phi' cos(theta-theta') + cos phi cos phi'
    # distance = rho * arc length
     
    cos = (math.sin(phi1)*math.sin(phi2)*math.cos(theta1 - theta2) + 
           math.cos(phi1)*math.cos(phi2))
    arc = math.acos( cos )
 
    # Remember to multiply arc by the radius of the earth 
    # in your favorite set of units to get length.
    #return arc
    return arc * 6371  # radius earth = 6,371 km

In [ ]:
def compute_closeness(rsvps):
    '''
    Compute how close the candidate is to our list of upcoming events. If they
    are farther away than DIST_THRESHOLD, filter them out.
    '''
    return (rsvps
            .map(add_closest_mu)
            .filter(lambda x: x['cmu_dist'] < DIST_THRESHOLD))

def add_closest_mu(rsvp):
    # save closest upcoming meetup
    cmu = None
    cmu_dist = sys.maxsize
    # loop through our upcoming events, looking for closest
    for mu in upcoming_meetups:
        m = mu['group']
        r = rsvp['group']
        dist = distance_on_unit_sphere(m['group_lat'], m['group_lon'], r['group_lat'], r['group_lon'])
        if dist < cmu_dist:
            cmu = mu
            cmu_dist = dist
    # store this data in the RSVP record
    rsvp['cmu'] = cmu
    rsvp['cmu_dist'] = cmu_dist
    return rsvp

In [ ]:
def get_members(rsvps):
    '''Create a member record from the RSVP record.'''
    return rsvps.map(map_rsvp_to_member)

def map_rsvp_to_member(rsvp):
    member = ({
        'id': rsvp['member']['member_id'],
        'member': rsvp['member'],
        'cmu': ({
            'id': rsvp['cmu']['id'],
            'name': rsvp['cmu']['name'],
            'event_url': rsvp['cmu']['event_url']
        }),
        'cmu_dist': rsvp['cmu_dist']
    })
    # use a fallback photo for those members without one
    if 'photo' not in member['member'] or member['member']['photo'] is None:
        member['member']['photo'] = 'http://photos4.meetupstatic.com/img/noPhoto_50.png'
    return member

In [ ]:
def gen_candidate_list(rsvps):
    '''Publish the candidates list (member records), which notifies the UI.'''
    rsvps.foreachRDD(update_cand_list)

def update_cand_list(rsvps_rdd):
    channel('candidates').set('cand_list', rsvps_rdd.collect())

Conversions

We keep track of candidates that we have contacted. If we later see that such a candidate has made an RSVP for one of our upcoming events, we count this as a "conversion".


In [ ]:
def process_conversions(rsvps):
    '''Check the RSVP stream for "converted" members, updating our running total.'''
    conversion_stream = rsvps.filter(lambda x: x['response'] == 'yes' and 
            any(topic['urlkey'] == PROD_TOPIC for topic in x['group']['group_topics']) and
            x['member']['member_id'] in conversions)
    conversion_stream.foreachRDD(rdd_update_conversions_count)

def rdd_update_conversions_count(rdd):
    for rsvp in rdd.collect():
        muid = rsvp['event']['event_id']
        if muid in rsvp_count_history:
            # increment conversions counter in *latest* entry
            rsvp_count_history[muid][-1][2] += 1
        # TODO what do we do if `rsvp_count_history` doesn't have `muid`?

In [ ]:
def candidate_contacted(id):
    '''
    Callback which can be used by the UI to update the list of candidates
    we have contacted.
    Works in conjunction with the <template> which follows.
    '''
    conversions.append(id)

In [ ]:
%%html
<template is="urth-core-bind">
    <urth-core-function id="candidateContacted" ref="candidate_contacted" arg-id="{{id}}"></urth-core-function>
</template

Widgets

In this section we import some basic widets and use them to define custom widgets for displaying meetup information in tabular form. These widgets could be defined outside the notebook and imported here to keep the notebook slimmer, but in the interest of having everything in one spot, they're done inline.


In [ ]:
%%html
<link rel="import" href="urth_components/iron-resizable-behavior/iron-resizable-behavior.html"
    is="urth-core-import" package="PolymerElements/iron-resizable-behavior">
<link rel="import" href="urth_components/iron-list/iron-list.html"
    is="urth-core-import" package="PolymerElements/iron-list">
<link rel="import" href="urth_components/paper-toggle-button/paper-toggle-button.html"
    is="urth-core-import" package="PolymerElements/paper-toggle-button#v1.0.10">
<link rel="import" href="urth_components/paper-material/paper-material.html"
    is="urth-core-import" package="PolymerElements/paper-material">
<link rel="import" href="./dynamic-list.html">
<link rel="import" href="./upcoming-meetups.html">

List local files here so they will be accessible when deployed:

dynamic-list.html
upcoming-meetups.html

Meetups List

This widget creates a fancy table view showing your meetups and the change in RSVPs to them over the configured window of time.


In [ ]:
%%html
<template is="urth-core-bind" channel="meetups" id="meetups-list-tmpl">
    <upcoming-meetups items="[[list]]" change-label="[[change_label]]"/>
</template>

Dashboard Heading

This widget summarizes the topic used to track your meetups and the parameters used to filter the RSVP stream.


In [ ]:
%%html
<template is="urth-core-bind" channel="meetups">
    <style>
        .topics-desc {
            font-size: larger;
        }
        .topics-desc span {
            font-style: italic;
        }
    </style>
    <div class="topics-desc">
       Showing upcoming meetups for the <span>[[upcoming_label]]</span> topic. Meetup candidates are generated using the following topic filters: <span>[[cand_topics_label]]</span>
    </div>
</template>

Total RSVPs Plot

This plot shows the total RSVPs for your upcoming meetups and highlights the RSVPs that are due to conversions.


In [ ]:
def plot_total_rsvps(list):
    '''
    Create a stacked plot where the total is the number of RSVPs over a period of time,
    and the top layer are "converted" RSVPs.
    '''
    X = [i[0] for i in list]
    Y = [[i[1] - i[2] for i in list],  [i[2] for i in list]]

    fig, ax = plt.subplots(1,1,figsize=(10,7))
    ax.stackplot(X, *Y, baseline='zero', colors=['#2fa6e4','#82d1f5'])
    ax.set_title('Total RSVPs for Upcoming Meetups')
    fig.tight_layout()
    plt.close()
    return fig

In [ ]:
%%html
<template is="urth-core-bind" channel="plots">
    <img src={{total_rsvp_plot}}>
</template>

Initial call to update Upcoming Meetups list

We have to put code for configuring the frontend widgets after their definition because widgets cannot yet pull existing data off of channel when instantiated. When issue 35 is resolved, we can move this code up nearer to the configuration section.


In [ ]:
refresh_meetups_list()
channel('meetups').set('change_label', MU_CHANGE_LABEL)
channel('meetups').set('upcoming_label', PROD_TOPIC)
channel('meetups').set('cand_topics_label', ', '.join(RELEVANT_TOPICS))

Candidates List

This widget creates a fancy table view showing candidates for your meetups. It can operate in one of two modes:

  1. Live update - table refreshes every time new candidates are identified in the stream
  2. Click to update - table shows the number of new candidates but only shows them when the user clicks the number

In [ ]:
%%html
<template is="urth-core-bind" channel="candidates">
    <dynamic-list items="[[cand_list]]" id-prop="id" heading="Meetup Candidates"></dynamic-list>
</template>

Setup streaming

Finally, we define functions and a simple toggle switch widget for connecting a streaming Spark context to the meetup.com RSVP stream.


In [ ]:
def create_streaming_context(checkpoint_dir, sample_rate):
    '''
    Creates a new SparkContext and SparkStreamingContext. Done in a function
    to allow repeated start/stop of the streaming. Returns the streaming
    context instance.
    
    :param checkpoint_dir: Directory to use to track Spark job state
    :param sample_rate: Stream sampling rate in seconds
    '''
    # create a local SparkContext to start using as many CPUs as we can
    sc = SparkContext('local[*]')
    
    # wrap it in a StreamingContext that collects from the stream
    ssc = StreamingContext(sc, sample_rate)

    # Setup a checkpoint directory to keep total counts over time.
    ssc.checkpoint(os.path.join(checkpoint_dir, '.checkpoint'))
    
    return ssc

In [ ]:
class FileRingReceiver(object):
    '''
    Hack around lack of custom DStream receivers in Python: 
    Create a ring buffer of UTF-8 text files on disk.
    '''
    def __init__(self, max_batches=10):
        self.queue = tempfile.mkdtemp()
        self.batch_count = 0
        self.max_batches = max_batches
        
    def __del__(self):
        self.destroy()
        
    def put(self, text):
        # ignore sentinels
        if text is None: return
        with open(os.path.join(self.queue, str(self.batch_count)), 'w', encoding='utf-8') as f:
            f.write(text)
        if self.batch_count >= self.max_batches:
            oldest = str(self.batch_count - self.max_batches)
            os.remove(os.path.join(self.queue, str(oldest)))
        self.batch_count += 1
        
    def destroy(self):
        shutil.rmtree(self.queue, ignore_errors=True)

In [ ]:
conn_future = None
ssc = None
receiver = None

In [ ]:
def start_stream():
    '''
    Creates a websocket client that pumps events into a ring buffer queue. Creates
    a SparkStreamContext that reads from the queue. Creates the events, topics, and
    venues DStreams, setting the widget channel publishing functions to iterate over
    RDDs in each. Starts the stream processing.
    '''
    global conn_future
    global ssc
    global receiver
    
    receiver = FileRingReceiver(max_batches=100)  
    conn_future = websocket_connect('ws://stream.meetup.com/2/rsvps', on_message_callback=receiver.put)
    ssc = create_streaming_context(receiver.queue, 5)
    process_rsvps(ssc, receiver.queue)
    ssc.start()
    
def shutdown_stream():
    '''
    Shuts down the websocket, stops the streaming context, and cleans up the file ring.
    '''
    global conn_future
    global ssc
    global receiver
    
    conn_future.result().close()
    ssc.stop()
    receiver.destroy()

In [ ]:
%%html
<template is="urth-core-bind">
    <urth-core-function id="streamFunc" ref="start_stream"></urth-core-function>
    <urth-core-function id="shutdownFunc" ref="shutdown_stream"></urth-core-function>
</template>

<style is="custom-style">
    paper-toggle-button {
        --default-primary-color: green;
    }
    
    paper-toggle-button:hover {
        cursor: pointer;
    }
        
    .toggle-btn-container {
        margin: 1em 0;
        text-align: right;
    }
    
    #stream-label {
        font-size: larger;
        margin: 0;
        padding: 0 0.5em;
    }
</style>

<div class="toggle-btn-container">
    <paper-toggle-button id="stream-btn"></paper-toggle-button>
    <label id="stream-label">Stream</label>
</div>

<script>
    $('#stream-btn').on('change', function() {
        if ($(this).attr('checked')) {
            // start streaming
            console.warn('Starting Spark Streaming');
            $('#streamFunc').get(0).invoke();
        } else {
            // stop streaming
            console.warn('Stopping Spark Streaming');
            $('#shutdownFunc').get(0).invoke();
        }
    });
</script>

Demo Mode

In order to run a standalone demo, the following code will (if DEMO is set to True) periodically simulate a mouse click on the contact button for a random candidate in the list. This will make it so that the rest of the code considers this as a "conversion". Over time, as "converted" members also sign up for one of the upcoming events in our list, the plot will show two different colors, the top one signifying the number of "converted" candidates.


In [ ]:
%%html
<template is="urth-core-bind" channel="demo" id="demo-tmpl">
    <script>
        var startDemoMode = function(e) {
            if (e.detail.channel !== 'demo' || e.detail.key !== 'on') {
                return;
            }
            console.log('STARTING DEMO MODE');
            setInterval(function() {
                var list = document.querySelector('dynamic-list');
                if (list) {
                    list._simulateContactClick();
                }
            }, 60000);
            demoTmpl.removeEventListener('channelSetItem', startDemoMode);
        }

        var demoTmpl = document.getElementById('demo-tmpl');
        demoTmpl.register(demoTmpl, '*');
        demoTmpl.addEventListener('channelSetItem', startDemoMode);
    </script>
</template>

In [ ]:
if DEMO:
    channel('demo').set('on', 1)