In [ ]:
import datetime
import json
import sys
import time

Introduction

We're going to improve the tweet_enricher.py script from Gnip-Analysis-Pipeline. We'll make a simplified version and create variations that improve it in various ways.

To enrich tweets, we will need:

  • a tweet source
  • a version of the enricher script (we'll use a function)
  • one or more enrichment classes

See the README for an explanation of these components.

A stream of tweets

The enrichment step of the analysis pipeline is designed to work on a potentially infinite stream of tweets. A generator will simulate this nicely.


In [ ]:
DT_FORMAT_STR = "%Y-%m-%dT%H:%M:%S.%f"

def stream_of_tweets(n=10):
    # generator function to generate sequential tweets
    for i in range(n):
        time.sleep(0.01)
        tweet = {
            'body':'I am tweet #' + str(i),
            'postedTime':datetime.datetime.now().strftime(DT_FORMAT_STR)       
                }
        yield json.dumps(tweet)

In [ ]:
for tweet in stream_of_tweets(2):
    print(tweet)

In [ ]:
for tweet in stream_of_tweets():
    print(tweet)

The tweet enricher


In [ ]:
def enrich_tweets_1(istream,enrichment_class_list):
    
    """ simplified copy of tweet_enricher.py """
    
    enrichment_instance_list = [enrichment_class() for enrichment_class in enrichment_class_list]
    
    for tweet_str in istream:
        try:
            tweet = json.loads(tweet_str)
        except ValueError:
            continue
            
        for instance in enrichment_instance_list:
            instance.enrich(tweet)
            
        sys.stdout.write( json.dumps(tweet) + '\n')

Enrichment classes


In [ ]:
class TestEnrichment():
    value = 42
    def enrich(self,tweet):
        if 'enrichments' not in tweet:
            tweet['enrichments'] = {}
        tweet['enrichments']['TestEnrichment'] = self.value

In [ ]:
class TestEnrichment2():
    value = 48
    def enrich(self,tweet):
        if 'enrichments' not in tweet:
            tweet['enrichments'] = {}
        tweet['enrichments']['TestEnrichment2'] = self.value

In [ ]:
enrich_tweets_1(stream_of_tweets(5),[TestEnrichment,TestEnrichment2])

Convenience and simplification

  • remove JSON-(de)serialization
  • use only one enrichment class, derived from a base class

In [ ]:
DT_FORMAT_STR = "%Y-%m-%dT%H:%M:%S.%f"

def stream_of_tweets(n=10):
    # generator function to generate sequential tweets
    for i in range(n):
        time.sleep(0.01)
        tweet = {
            'body':'I am tweet #' + str(i),
            'postedTime':datetime.datetime.now().strftime(DT_FORMAT_STR)       
                }
        yield tweet # <<-- this is the only change from above

In [ ]:
class EnrichmentBase():
    def enrich(self,tweet):
        if 'enrichments' not in tweet:
            tweet['enrichments'] = {}
        tweet['enrichments'][type(self).__name__] = self.enrichment_value(tweet)

In [ ]:
class TestEnrichment(EnrichmentBase):
    def enrichment_value(self,tweet):
        return 42

In [ ]:
def enrich_tweets_2(istream,enrichment_class,**kwargs):    
    """ 
    simplify `enrich_tweets_1 :
        only one enrichment
        generator function
        leave tweets as dict objects
    """
    
    enrichment_instance = enrichment_class()
    
    for tweet in istream:
        enrichment_instance.enrich(tweet)
        sys.stdout.write( str(tweet) + '\n')

In [ ]:
%%time
enrich_tweets_2(
    istream=stream_of_tweets(5),
    enrichment_class=TestEnrichment
   )

The problem


In [ ]:
class SlowEnrichment(EnrichmentBase):
    def enrichment_value(self,tweet):
        # get the tweet number from body
        # and sleep accordingly
        seconds = int(tweet['body'][-1]) + 1
        time.sleep(seconds)
        return str(seconds) + ' second nap'

In [ ]:
%%time
enrich_tweets_2(
    istream=stream_of_tweets(5),
    enrichment_class=SlowEnrichment
   )

commentary

  • tweets are run sequentially
  • tweet dictionary is mutated by enrich
  • yield enriched tweet when ready

problems

  • there's no reason for subsequent enrichment operations to be blocked by a sleep call (imagine the sleep is a web request).

Threads

Despite what you make have heard about Python's Global Interpreter Lock (GIL), core library (read: those written in C) routines can release the GIL while they are waiting on the operating system. The time.sleep function is a good example of this, but extends to things like the requests package.


In [ ]:
import threading
def enrich_tweets_3(istream,enrichment_class):
    """ 
    use threads to run `enrich`
    """
    enrichment_instance = enrichment_class()
    
    # we need to hang onto the threads spawned
    threads = []
    # ...and the tweets
    enriched_tweets = []
    
    for tweet in istream:
        
        # run `enrich` in a new thread
        thread = threading.Thread(
            target=enrichment_instance.enrich,
            args=(tweet,)
        )
        thread.start() # runs the function in a new thread
        threads.append(thread)
        
        enriched_tweets.append(tweet)
        
    sys.stderr.write('submitted all tweets to threads' + '\n')
    for thread in threads:
        thread.join() # blocks until thread finishes

    sys.stderr.write('all threads finished' + '\n')
    for enriched_tweet in enriched_tweets:
        sys.stdout.write( str(enriched_tweet) + '\n')

In [ ]:
%%time
enrich_tweets_3(
    istream=stream_of_tweets(5),
    enrichment_class=SlowEnrichment
   )

commentary

  • run each enrich call in a separate thread
  • memory is shared between threads
  • execution takes roughly as long as the slowest enrichment

problems

  • we had to maintain lists of threads and enriched tweets
    • no limitation on number of threads
    • store all enriched tweets in memory

In [ ]:
def enrich_tweets_4(istream,enrichment_class,**kwargs):
    """ 
    better use of threads
    """
    
    enrichment_instance = enrichment_class()
    
    queue = [] # queue of (thread,tweet) tuples
    max_threads = kwargs['max_threads']

    for tweet in istream:
        
        # run `enrich` in a new thread
        thread = threading.Thread(
            target=enrichment_instance.enrich,
            args=(tweet,)
        )
        thread.start()
        queue.append((thread,tweet))

        # don't accept more tweets until a thread is free
        while len(queue) >= max_threads:
            # iterate through all threads
            # when threads are dead, remove from queue and yield tweet
            new_queue = []
            for thread,tweet in queue:
                if thread.is_alive():
                    new_queue.append((thread,tweet))
                else:
                    sys.stdout.write( str(tweet) + '\n') # print enriched tweet
            queue = new_queue
            time.sleep(0.1)
                     
    sys.stderr.write('submitted all tweets to threads' + '\n')
    
    # cleanup threads that didn't finish while iterating through tweets
    for thread,tweet in queue:
        thread.join()
        time.sleep(0.01)
        sys.stdout.write( str(tweet) + '\n')

In [ ]:
%%time
enrich_tweets_4(
    istream=stream_of_tweets(5),
    enrichment_class=SlowEnrichment,
    max_threads = 1 # play with this number
   )

commentary

  • we limited the number of alive threads
  • we store no more than max_threads tweets in memory

problems

  • awkward queue length management
  • have to manage individual threads

Futures

A Future is an object that represents a deferred computation that may or may not have completed. That computation might be run on a separate thread but the Future itself doesn't care where the operation is run.


In [ ]:
from concurrent import futures

def enrich_tweets_5(istream,enrichment_class,**kwargs):
    """ 
    use concurrent.futures instead of bare Threads
    """
    enrichment_instance = enrichment_class()
    
    with futures.ThreadPoolExecutor(max_workers=kwargs['max_workers']) as executor:
        
        future_to_tweet = {}
        for tweet in istream:

            # run `enrich` in a new thread, via a Future
            future = executor.submit(
                enrichment_instance.enrich,
                tweet
            )
            future_to_tweet[future] = tweet
            
        sys.stderr.write('submitted all tweets as futures' + '\n')    
        
        for future in futures.as_completed(future_to_tweet):
            sys.stdout.write( str(future_to_tweet[future]) + '\n')

In [ ]:
%%time
enrich_tweets_5(
    istream=stream_of_tweets(5),
    enrichment_class=SlowEnrichment,
    max_workers = 5
   )

commentary

  • futures.as_completed yields results as execution finishes
    • this is better than sequentially join-ing threads, as above, because results are yielded as soon as they become available

problems

  • we haven't limited the number of concurrent workers
  • we can't yield any results until we've finished looping through the tweet
  • we have to maintain a dict of all tweets and futures

Change the enrichment protocol

Currently, classes follow the enrichment protocol by defining an enrich method with the appropriate signature. Nothing is specififed regarding the return type or value.

We will now change this protocol such that the enrich method returns the enriched tweet dictionary, rather than relying on the mutability of the tweet dictionary passed to enrich. This allows us:

  • to "store" tweets in the Future and retrieve the enriched versions, obviating the need to maintain a record of all observed tweets
  • to generalize the submission interface such that we don't rely on the assumption of shared memory between the threads

In [ ]:
class NewEnrichmentBase():
    def enrich(self,tweet):
        if 'enrichments' not in tweet:
            tweet['enrichments'] = {}
        tweet['enrichments'][type(self).__name__] = self.enrichment_value(tweet)
        return tweet # <<-- the only new piece

In [ ]:
class NewSlowEnrichment(NewEnrichmentBase):
    def enrichment_value(self,tweet):
        # get the tweet number from body
        # and sleep accordingly
        seconds = int(tweet['body'].split('#')[-1]) + 1
        if seconds > 9:
            seconds = 1
        time.sleep(seconds)
        return str(seconds) + ' second nap'

In [ ]:
from concurrent import futures

def enrich_tweets_6(istream,enrichment_class,**kwargs):
    """ 
    new enrichment protocol
    """
    
    enrichment_instance = enrichment_class()
    
    with futures.ThreadPoolExecutor(max_workers=kwargs['max_workers']) as executor:
        
        futures_list = [] # <<-- this is now just a list of futures
        for tweet in istream:

            # run `enrich` in a new thread, via a Future
            future = executor.submit(
                enrichment_instance.enrich,
                tweet
            )
            futures_list.append(future)

        sys.stderr.write('submitted all tweets as futures' + '\n')    
        
        for future in futures.as_completed(futures_list):
            sys.stdout.write( str(future.result()) + '\n')

In [ ]:
%%time
enrich_tweets_6(
    istream=stream_of_tweets(5),
    enrichment_class=NewSlowEnrichment,
    max_workers = 5
   )

commentary

  • as before, results are yielded as soon as they are ready
  • we keep no explicit record of all tweets

problems

  • we don't get any results until we've iterated through all tweets, so we still keep an implicit list of all tweets
  • we have no limitation on the number of concurrent Future objects

In [ ]:
from concurrent import futures

def enrich_tweets_7(istream,enrichment_class,**kwargs):
    """ 
    """

    def print_the_tweet(future):
        sys.stdout.write( str(future.result()) + '\n') 
    
    enrichment_instance = enrichment_class()
    
    with futures.ThreadPoolExecutor(max_workers=kwargs['max_workers']) as executor:
        
        for tweet in istream:

            # run `enrich` in a new thread, via a Future
            future = executor.submit(
                enrichment_instance.enrich,
                tweet
            )
            future.add_done_callback(print_the_tweet)
            
        sys.stderr.write('submitted all tweets as futures' + '\n')

In [ ]:
%%time
enrich_tweets_7(
    istream=stream_of_tweets(5),
    enrichment_class=NewSlowEnrichment,
    max_workers = 5
   )

commentary

  • no explicit list of futures
  • callback function is run in the main thread
  • putting the print statement in the callback function allows the output to run asynchronously

problems

  • we haven't limited the number of queued operations in the executor

In [ ]:
from concurrent import futures

def enrich_tweets_8(istream,enrichment_class,**kwargs):
    """ 
    """
    max_workers = kwargs['max_workers']
    
    def print_the_tweet(future):
        sys.stdout.write( str(future.result()) + '\n') 
    
    enrichment_instance = enrichment_class()
    
    with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        
        futures_list = []
        for tweet in istream:

            # run `enrich` in a new thread, via a Future
            future = executor.submit(
                enrichment_instance.enrich,
                tweet
            )
            future.add_done_callback(print_the_tweet)
            futures_list.append(future)
        
            futures_list[:] = [future for future in futures_list if future.running()]
            while len(futures_list) >= max_workers:
                futures_list[:] = [future for future in futures_list if future.running()]
                time.sleep(0.5)


        sys.stderr.write('submitted all tweets as futures' + '\n')

In [ ]:
%%time
enrich_tweets_8(
    istream=stream_of_tweets(50),
    enrichment_class=NewSlowEnrichment,
    max_workers = 5
   )

commentary

  • we can now safely stream tweets into the enrich function, without queueing every tweet in the executor

problems

  • the buffering of calls to submit is still a bit of a hack, and potentially slow

In [ ]:
import queue
def enrich_tweets_9(istream,enrichment_class,**kwargs):
    """ 
    use a pool of threads, each running a worker reading from a common queue
    """
    
    max_workers = kwargs['max_workers']
    queue_size = kwargs['queue_size']
    
    enrichment_instance = enrichment_class()
        
    def worker():
        """
        this function runs on new threads
        and reads from a common queue
        """
        time.sleep(0.5)
        while True:
            tweet = q.get()
            if tweet is None: # this is the signal to exit
                break
            enriched_tweet = enrichment_instance.enrich(tweet)
            sys.stdout.write(str(enriched_tweet) + '\n')
            q.task_done()
            time.sleep(0.1)
          
    thread_pool = [threading.Thread(target=worker) for _ in range(max_workers)]
    [thread.start() for thread in thread_pool]
    
    q = queue.Queue(maxsize=queue_size)
                             
    for tweet in istream:

        q.put(tweet)
    
    sys.stderr.write('submitted all tweets to threads' + '\n')    
  
    # block until queue is empty
    q.join()

    # kill the threads
    for _ in range(len(thread_pool)):
        q.put(None)    
    
    for thread in thread_pool:
        thread.join()

In [ ]:
%%time
enrich_tweets_9(
    istream=stream_of_tweets(10),
    enrichment_class=NewSlowEnrichment,
    max_workers = 1,
    queue_size=5
   )

commentary

  • tweets are proccessed and returned fully asynchronously on a fixed pool of threads
  • the queue throttles the incoming tweet stream

problems

  • what about CPU-bound tasks?

In [ ]:
from random import randrange
import hashlib

class CPUBoundEnrichment(NewEnrichmentBase):
    def enrichment_value(self,tweet):
        # make a SHA-256 hash of random byte arrays
        data = bytearray(randrange(256) for i in range(2**21)) 
        algo = hashlib.new('sha256')
        algo.update(data)
        return algo.hexdigest()

In [ ]:
def enrich_tweets_10(istream,enrichment_class,**kwargs):
    """ 
    use a `ProcessPoolExecutor` to manage processes
    """
    
    max_workers=kwargs['max_workers']
    executor_name=kwargs['executor_name']
    
    def print_the_tweet(future):
        sys.stdout.write( str(future.result()) + '\n') 
    
    enrichment_instance = enrichment_class()
    
    with getattr(futures,executor_name)(max_workers=max_workers) as executor: # <- this is the only change from #8
        
        futures_list = []
        for tweet in istream:

            # run `enrich` in a new thread, via a Future
            future = executor.submit(
                enrichment_instance.enrich,
                tweet
            )
            future.add_done_callback(print_the_tweet)
            futures_list.append(future)
        
            # have to throttle with this hack
            futures_list[:] = [future for future in futures_list if future.running()]
            while len(futures_list) >= max_workers:
                futures_list[:] = [future for future in futures_list if future.running()]
                time.sleep(0.5)

        sys.stderr.write('submitted all tweets as futures' + '\n')

In [ ]:
%%time
enrich_tweets_10(
    istream=stream_of_tweets(10),
    enrichment_class=CPUBoundEnrichment,
    executor_name='ProcessPoolExecutor',
    max_workers = 2,
)

exercises for the reader

  • use a pool of Process objects, a multiprocess.Queue, and a callback function to build a single-queue example with processes

In [ ]: