Twitter geo hashtag heatmap

The goal is to

  • Collect and process streaming data without blocking notebook input,
  • Use an in-memory database to store and read stream data,
  • Perform analyses using up-to-the minute data,
  • Display analysis output in the notebook with Javascript, using Google Maps API.

In [ ]:
import json
import time
import datetime
import dateutil.parser
import numpy as np
import multiprocessing as mp
import redis
import string
import collections
from Queue import Empty
import itertools
import twitter

There are two ways to run this notebook. In the optimal case, by obtaining a Twitter API key from Twitter's developer site and then doing a "pip install twitter", you can utilize a realtime stream of Twitter data. API keys are free and easy to get; I highly encourage you do so, it'll make it a lot more fun. If you need guidance in obtaining an API key, check out this Gist.

If that doesn't work for you, the second method is to utilize a static dataset included in the bundle (static_tweet_data.json.gz). This doesn't require as many downloads or any API keys, but also doesn't get at the concurrent, realtime capabilities of this workflow.

If you run through the notebook once with the static data, then begin using the live stream, it will happily append the fresh data to the static data.

So below, REALTIME_DATA is set to False. If you have a Twitter API key, set this value to True and fill in the values for the access token and consumer token.


In [ ]:
REALTIME_DATA = True # Set this to true in order to use Twitter API.

if REALTIME_DATA:
    #This is private data. Don't re-share this notebook with your keys here.
    twitter_stream = twitter.TwitterStream(auth=twitter.OAuth(
            token="NA",
            token_secret="NA",
            consumer_key="NA",
            consumer_secret="NA"))
    stream = twitter_stream.statuses.sample(block=True)
    
    testing = stream.next() # This is just to make sure the stream is emitting data.
    print "Done!"

The raw Tweets that come from the Twitter stream contain lots of data, some of which lies in nested structures. Here is an example, the following block will display a single raw tweet:


In [ ]:
if REALTIME_DATA:
    raw_T = None
    while not raw_T or 'delete' in raw_T:
        raw_T = stream.next()
    print json.dumps(raw_T)
else:
    print json.dumps(json.load(open('one_tweet.json')))

For the purposes of this notebook, we only care about a few of the fields, and we want them in predictable, flat structures. The Tweet class is just a subclass of dict that will extract and parse the data we care about.


In [ ]:
class Tweet(dict):
    def __init__(self, raw_tweet):
        super(Tweet, self).__init__(self)
        if raw_tweet and 'delete' not in raw_tweet:
            self['timestamp'] = dateutil.parser.parse(raw_tweet[u'created_at']
                                ).replace(tzinfo=None).isoformat()
            self['text'] = raw_tweet['text']
            self['hashtags'] = [x['text'] for x in raw_tweet['entities']['hashtags']]
            self['geo'] = raw_tweet['geo']['coordinates'] if raw_tweet['geo'] else None
            self['id'] = raw_tweet['id']
            self['screen_name'] = raw_tweet['user']['screen_name']
            self['user_id'] = raw_tweet['user']['id']

The stream can include deleted Tweets, which we want to exclude. These deleted Tweets just become empty Tweet objects, hence the "while not T" idiom:


In [ ]:
if REALTIME_DATA:
    T = None
    while not T:
        T = Tweet(stream.next())
else:
    T = Tweet(json.load(open('one_tweet.json')))

print json.dumps(T)

Language detection

With a stream of Tweets in hand, we would like to perform language detection on each Tweet in order to group them together by language.

The Natural Language Toolkit (NLTK) contains tools for text analysis. We are going to employ a cheap method of language detection: counting special stopwords from each language in the text of each tweet. The language with the most stopwords in the Tweet text is then chosen as the originating language. In the case of a tie, a winner is randomly chosen from those languages with the most matches.

This method is far from perfect, but is a reasonable first attempt. Will it be good enough to be useful? Keep reading to find out!


In [ ]:
from nltk.corpus import stopwords # stopwords to detect language
from nltk import wordpunct_tokenize # function to split up our words

def get_likely_language(input_text):
    """ 
    Return the most likely language of the given text,
    along with the number of stopword matches for that
    language and total.

    Adapted from: http://h6o6.com/2012/12/detecting-language-with-python-and-the-natural-language-toolkit-nltk/
    """
    input_text = input_text.lower()
    input_words = wordpunct_tokenize(input_text)
 
    likely_language = 'unknown'
    likely_language_matches = 0
    total_matches = 0
    stopword_sets = dict([(lang, set(stopwords.words(lang))) 
                            for lang in stopwords._fileids])
    
    for language in np.random.permutation(stopwords._fileids):
        language_matches = len(set(input_words) & stopword_sets[language])
        total_matches += language_matches
        if language_matches > likely_language_matches:
            likely_language_matches = language_matches
            likely_language = language
 
    return (likely_language, likely_language_matches, total_matches)

Let's try this on a few Tweets from the stream and see what we get:


In [ ]:
if REALTIME_DATA:
    for i in range(5):
        T = Tweet(stream.next())
        if T:
            T['language'] = get_likely_language(T['text'])[0]
            print "%s, %i, %i: %s" % (get_likely_language(T['text']) + (T['text'],))
else:
    T = Tweet(json.load(open('one_tweet.json')))
    T['language'] = get_likely_language(T['text'])[0]
    print "%s, %i, %i: %s" % (get_likely_language(T['text']) + (T['text'],))

Redis data store

We are using Redis to store our data for a few important reasons. First and most importantly, we can read data from the Redis database while we are writing to it in the background without dealing with threading/multiprocessing annoyances like semaphores and locks. Second, while Redis is an in-memory database, it saves to disk periodically for persistence. This way we can pick up where we left off if there is a problem, as opposed to if we were storing our data in Python objects.

First we need to start the Redis server. Either run "redis-server" from from a shell terminal, or just run the following cell.


In [ ]:
!redis-cli ping &> /dev/null || (redis-server & disown)

The previous block may give a warning about a missing config file. This is fine to ignore.

Once the Redis server is running, we can connect to it. Redis can have multiple different keyspaces each identified by a nonnegative integer, here DB_NUMBER.


In [ ]:
DB_NUMBER = 0
r = redis.StrictRedis(host='localhost', port=6379, db=DB_NUMBER)

Since we will not be modifying the data objects once we load them into the database, the append-only persisistence method provides excellent fault tolerance with no downside. The append-only persistence option is less appropriate for rapidly-changing data.


In [ ]:
r.config_set('appendonly', 'yes')
r.config_get('appendonly')

There are a variety of data structures available in Redis, but the most natural for our purposes are lists. If 'test' is the key/name for our list, we can push arbitrary numbers of elements to the end (right) of the list.


In [ ]:
r.rpush('test', 'a', 'b', 'c')

Lists in Redis are doubly-linked, so we can push to the beginning (left) as well. Appending to either side is done in constant time.


In [ ]:
r.lpush('test', 'd')

Access list slices with the lrange method. Indices can be specified from the left(+) or the right(-). As in Python, the leftmost index is 0, and the rightmost is -1. As opposed to Python, slice indices are inclusive.


In [ ]:
print r.lrange('test', 0, 3)
print r.lrange('test', -2, -1)

Also unlike Python, specifying out-of-bounds indices is not a problem.


In [ ]:
print r.lrange('test', 0, 100)

There's plenty more to learn about Redis, but that's about all we need right now.


In [ ]:
r.delete('test')

Multiprocessing data pipeline

In the case where we are using realtime streaming data, we want the data collection to occur in the background so we can perform analysis in the notebook. For this, we will use multiprocessing. So now that we have a way to guess at the language being used and a place to store the data for analysis, we want to put together the following pipeline for the data:

  1. Read Tweets from the stream and load the relevant, parsed information into Tweet objects.
  2. Get Tweet objects, determine their language, attach that data to the Tweet object.
  3. If we determined the language, save the data in a bin according to that language.

With multiprocessing in mind, we will define a small worker function for each of these steps using queues for inter-process communication. The first of these workers takes the raw data from the Twitter stream loads it into our custom Tweet objects. Garbage data from the stream, like deleted tweets, are not passed along to the next worker.


In [ ]:
SENTINEL = '$QUIT'
def load_tweets(twitter_stream, out_q, time_limit):
    """
    This is a worker that takes a live Twitter stream, 
    and puts parsed Tweet objects into the out queue.
    
    ARGS:
    twitter_stream: An instance of twitter.TwitterStream.
    out_q: An instance of multiprocessing.queue.
    time_limit: A number, the run time in minutes.
    """
    end_time = time.time() + 60*time_limit
    twiterator = itertools.chain.from_iterable(
                    itertools.repeat(
                    twitter_stream.statuses.sample()))
    try:
        for raw_tweet in twiterator:
            T = Tweet(raw_tweet)
            if T: 
                out_q.put(T)
            if time.time() > end_time:
                break
    finally:
        out_q.put(SENTINEL)

The language detector worker only passes on Tweets whose language it can detect.


In [ ]:
def get_language(in_q, out_q):
    """
    This is a worker that takes Tweet objects from the
    input queue, guesses their originating language,
    and places successfully identified Tweet objects
    on the output queue.
    
    ARGS:
    in_q: An instance of multiprocessing.queue.
    out_q: An instance of multiprocessing.queue.
    """
    try:
        while True:
            try:
                T = in_q.get(timeout=1)
            except Empty:
                continue
            if T == SENTINEL:
                break
            elif T:
                lang, matches, total_matches = get_likely_language(T['text'])
                if lang and matches > 1:
                    T['language'] = lang
                    out_q.put(T)
    finally:
        out_q.put(SENTINEL)
        in_q.put(SENTINEL)

The record write worker is saving two types of objects for each tweet with identifiable language:

  1. The lat/lon coordinates, if they exist
  2. The hashtags contained within, if any

In this way, we can find out the most popular hashtags for each language, as well as plot the locations for tweeters for each language.


In [ ]:
def write_records(in_q, db):
    """
    This is a worker that takes Tweet objects from the
    input queue, and writes selected data to a Redis db.
    
    ARGS:
    in_q: An instance of multiprocessing.queue.
    db: A redis.StrictRedis instance, the Redis database
        to write out to.
    """
    try:
        while True:
            try:
                T = in_q.get(timeout=1)
            except Empty:
                continue
            if T == SENTINEL:
                break
            elif T and T.get('language'):
                if T['hashtags']:
                    db.rpush('hashtag_' + T['language'], *T['hashtags'])
                if T['geo']:
                    db.rpush('geo_' + T['language'], json.dumps(T['geo']))
    finally:
        in_q.put(SENTINEL)

Now that we have defined the worker functions for the data pipeline, we need to create the Process and Queue objects. The language_queue will link the stream worker with the language detection workers, and the write_queue will link the language detectors to the database write worker. The TIME_LIMIT variable controls how long data collection will persist in the background.


In [ ]:
TIME_LIMIT = 30 #in minutes

# multiprocessing is imported as mp
if REALTIME_DATA:
    language_queue = mp.Queue()
    write_queue = mp.Queue()
    
    # Stream worker
    process_list = []
    process_list.append(mp.Process(
            target=load_tweets, 
            args=(twitter_stream, language_queue, TIME_LIMIT)))

Language detection takes longer than the other steps, so we create two language detection workers in order to ensure that data doesn't back up in the language_queue.


In [ ]:
if REALTIME_DATA:
    # Two language detection workers
    for i in range(2):
        process_list.append(mp.Process(
            target=get_language, 
            args=(language_queue, write_queue)))
    
    # DB write worker
    process_list.append(mp.Process(
            target=write_records, 
            args=(write_queue, r)))

We have created the process objects and put them in a list, now we call the start() method on each.


In [ ]:
if REALTIME_DATA:
    print "Starting processes at: %s" % datetime.datetime.now()
    print "Processes will end at: %s" % (datetime.datetime.now() + datetime.timedelta(minutes=TIME_LIMIT))
    for p in process_list:
        p.start()
else:
    print "Load non-realtime data two cells below."

If all went according to plan, the worker processes are now operating in the background, and we are able to still execute commands in the notebook. Execute the following cell to check.


In [ ]:
print "Still responsive!"

In the case where we are not using a realtime Twitter feed, we will load a scrubbed, static dataset with languages already detected.


In [ ]:
SENTINEL = '$QUIT'
if not REALTIME_DATA:
    import gzip, Queue
    static_data = json.load(gzip.open('static_tweet_data.json.gz'))
    tweet_queue = Queue.Queue()
    for tweet in static_data:
        tweet_queue.put(tweet)
    tweet_queue.put(SENTINEL)
    print "Tweets loaded into queue, writing to db."
    write_records(tweet_queue, r)
    print "Tweets written to db."

Status checks

Great! We can still execute code. Now we need a way to check on our processes. This function will print out some relevant data. Execute it whenever you want to check up on the background processes.


In [ ]:
def check_status():
    print "Current time:", datetime.datetime.now().time().replace(microsecond=0)
    print "Redis memory used:", r.info()['used_memory_peak_human']
    if REALTIME_DATA:
        print "Queue lengths:", [q.qsize() for q in language_queue, write_queue]
        print "Processes alive?", [x.is_alive() for x in process_list]
        
check_status()

So we can check how our processes are doing, but the real question is how our data is doing. The beauty of Redis is that we can read from our lists whether or not the background processes are writing.

The following block counts how many data are saved in each of our different lists. It can be executed any time you want a count of the records stored.


In [ ]:
print "%10.10s: %6s,%7s" % ("language", "geo", "hashtag")
print "-" * 26
for lang in sorted(stopwords._fileids):
    print "%10.10s: %6i, %6i" % (lang, r.llen('geo_'+lang), r.llen('hashtag_' + lang))

Each of the "hashtag_{language}" lists consists of all of the hashtags tweeted in that language. The collections library has a Counter class that is perfect for doing a wordcount on each language's 5000 most recent hashtags, and returning the TOP_N_HASHTAGS along with their counts. Obviously this analysis gets more informative when there are many tweets in the Redis db.

This cell can be re-executed at any time to update the counts


In [ ]:
TOP_N_HASHTAGS = 3
for lang in sorted(stopwords._fileids):
    key = 'hashtag_' + lang
    c = collections.Counter()
    hashtags = r.lrange(key, -5000, -1)
    c.update(map(string.lower, hashtags))
    print "%10s: %s"%(lang, ', '.join(["%s %i" % x for x in c.most_common(TOP_N_HASHTAGS)]))

Analysis: Geographic language distribution

We have collected a subset of tweets with both a geographic location and a detectable location. For each language, we want to plot these locations on a map in order to see the geographic distribution of Tweets in that language. Towards that aim, we will utilize the Google Maps API along with IPython's ability to run Javascript.

The basic idea of how we are going to proceed is that we will construct Python strings of Javascript/HTML code, and then pass these to the IPython core display method. The following cell loads the Google Maps API if it is not already loaded.


In [ ]:
from IPython import display
js_loader = """
function verifyJSLoaded(){
    var jsapiLoaded = (typeof google === 'object' && typeof google.maps === 'object');
    console.log("Google API Loaded: " + jsapiLoaded);
    return jsapiLoaded;
}

function loadScript() {
  if (!verifyJSLoaded()) {
    console.log('Loading Google API.');
    var script = document.createElement("script");
    script.type = "text/javascript";
    script.src = "https://maps.googleapis.com/maps/api/js?sensor=false&libraries=visualization&callback=console.log";
    document.body.appendChild(script);
  }
}

loadScript();
"""
display.Javascript(js_loader)

While the above piece of JS can be executed verbatim, we will make use of string formatting to construct our other pieces of JS/HTML.


In [ ]:
html_template = '<div id="%s" style="width: 500px; height: 300px"></div>'
#This is to make sure the JS gets loaded before we try to load the maps
time.sleep(1)

The following function will generate the Javascript for creating a heatmap given a Redis key to a list of coordinates.

The coordinate pairs are loaded into Python tuples, and then placed into strings like "new google.maps.LatLng( , )" which eval to the appropriate Javascript objects. These strings are then joined up and placed in the appropriate place in the Javascript such that they comprise the geoData array.


In [ ]:
def gen_javascript(key, div_id=None, N=100):
    """
    Generates javascript to draw a heatmap with Google Maps API.
    ARGS:
    key: The name of the Redis key storing coordinate pairs to map.
    div_id: The id of the HTML div to place the map in. A value 
            of None will draw the map in a div with id = key.
    N: The number of coordinate pairs to plot. The N most recent
       pairs will be drawn.
    """

    if div_id == None:
        div_id = key
    # Gets the N most *recent* coordinates 
    pairs = [json.loads(x) for x in r.lrange(key,-N,-1)]
    # Creates Javascript objects which will comprise geoData.
    coords = ',\n  '.join(["new google.maps.LatLng(%s, %s)" % tuple(pair) for pair in pairs])    
    template_jscript = """
    var geoData = [
    %s
    ];
        
    var map, heatmap;
        
    function hmap_initialize() {
        var mapOptions = {
        zoom: 1,
        center: new google.maps.LatLng(30.5171, 0.1062),
        mapTypeId: google.maps.MapTypeId.SATELLITE
        };
        
        map = new google.maps.Map(document.getElementById('%s'),
              mapOptions);
        
        var pointArray = new google.maps.MVCArray(geoData);
        
        heatmap = new google.maps.visualization.HeatmapLayer({
        data: pointArray
        });
        
      heatmap.setMap(map);
    }
    
    hmap_initialize();
    """
    return template_jscript % (coords, div_id)

The next block will take a key like "geo_{language}", create the Javascript, and then create the HTML div for the map.

After executing the next block, the output will be blank. This is normal. Execute the block after next to load the map into the div created by the output of the next cell. Note that these two steps can't be run in the same block.


In [ ]:
key = 'geo_english'
jscript = gen_javascript(key, N=400)
display.HTML(html_template % key)

In [ ]:
display.Javascript(jscript)

You can specify any of the languages we were detecting, listed up above a few times, but it works best with the more common languages. Try spanish, french, and portuguese.


In [ ]:
key = 'geo_spanish'
jscript = gen_javascript(key, N=400)
display.HTML(html_template % key)

In [ ]:
display.Javascript(jscript)

You can execute these blocks to refresh the maps as often as you want, the maps draw the 400 most recent data points by default.