In [ ]:
import datetime
import json
import sys
import time
We're going to improve the tweet_enricher.py script from Gnip-Analysis-Pipeline. We'll make a simplified version and create variations that improve it in various ways.
To enrich tweets, we will need:
See the README for an explanation of these components.
In [ ]:
DT_FORMAT_STR = "%Y-%m-%dT%H:%M:%S.%f"
def stream_of_tweets(n=10):
# generator function to generate sequential tweets
for i in range(n):
time.sleep(0.01)
tweet = {
'body':'I am tweet #' + str(i),
'postedTime':datetime.datetime.now().strftime(DT_FORMAT_STR)
}
yield json.dumps(tweet)
In [ ]:
for tweet in stream_of_tweets(2):
print(tweet)
In [ ]:
for tweet in stream_of_tweets():
print(tweet)
In [ ]:
def enrich_tweets_1(istream,enrichment_class_list):
""" simplified copy of tweet_enricher.py """
enrichment_instance_list = [enrichment_class() for enrichment_class in enrichment_class_list]
for tweet_str in istream:
try:
tweet = json.loads(tweet_str)
except ValueError:
continue
for instance in enrichment_instance_list:
instance.enrich(tweet)
sys.stdout.write( json.dumps(tweet) + '\n')
In [ ]:
class TestEnrichment():
value = 42
def enrich(self,tweet):
if 'enrichments' not in tweet:
tweet['enrichments'] = {}
tweet['enrichments']['TestEnrichment'] = self.value
In [ ]:
class TestEnrichment2():
value = 48
def enrich(self,tweet):
if 'enrichments' not in tweet:
tweet['enrichments'] = {}
tweet['enrichments']['TestEnrichment2'] = self.value
In [ ]:
enrich_tweets_1(stream_of_tweets(5),[TestEnrichment,TestEnrichment2])
In [ ]:
DT_FORMAT_STR = "%Y-%m-%dT%H:%M:%S.%f"
def stream_of_tweets(n=10):
# generator function to generate sequential tweets
for i in range(n):
time.sleep(0.01)
tweet = {
'body':'I am tweet #' + str(i),
'postedTime':datetime.datetime.now().strftime(DT_FORMAT_STR)
}
yield tweet # <<-- this is the only change from above
In [ ]:
class EnrichmentBase():
def enrich(self,tweet):
if 'enrichments' not in tweet:
tweet['enrichments'] = {}
tweet['enrichments'][type(self).__name__] = self.enrichment_value(tweet)
In [ ]:
class TestEnrichment(EnrichmentBase):
def enrichment_value(self,tweet):
return 42
In [ ]:
def enrich_tweets_2(istream,enrichment_class,**kwargs):
"""
simplify `enrich_tweets_1 :
only one enrichment
generator function
leave tweets as dict objects
"""
enrichment_instance = enrichment_class()
for tweet in istream:
enrichment_instance.enrich(tweet)
sys.stdout.write( str(tweet) + '\n')
In [ ]:
%%time
enrich_tweets_2(
istream=stream_of_tweets(5),
enrichment_class=TestEnrichment
)
In [ ]:
class SlowEnrichment(EnrichmentBase):
def enrichment_value(self,tweet):
# get the tweet number from body
# and sleep accordingly
seconds = int(tweet['body'][-1]) + 1
time.sleep(seconds)
return str(seconds) + ' second nap'
In [ ]:
%%time
enrich_tweets_2(
istream=stream_of_tweets(5),
enrichment_class=SlowEnrichment
)
Despite what you make have heard about Python's Global Interpreter Lock (GIL), core library (read: those written in C) routines can release the GIL while they are waiting on the operating system. The time.sleep function is a good example of this, but extends to things like the requests package.
In [ ]:
import threading
def enrich_tweets_3(istream,enrichment_class):
"""
use threads to run `enrich`
"""
enrichment_instance = enrichment_class()
# we need to hang onto the threads spawned
threads = []
# ...and the tweets
enriched_tweets = []
for tweet in istream:
# run `enrich` in a new thread
thread = threading.Thread(
target=enrichment_instance.enrich,
args=(tweet,)
)
thread.start() # runs the function in a new thread
threads.append(thread)
enriched_tweets.append(tweet)
sys.stderr.write('submitted all tweets to threads' + '\n')
for thread in threads:
thread.join() # blocks until thread finishes
sys.stderr.write('all threads finished' + '\n')
for enriched_tweet in enriched_tweets:
sys.stdout.write( str(enriched_tweet) + '\n')
In [ ]:
%%time
enrich_tweets_3(
istream=stream_of_tweets(5),
enrichment_class=SlowEnrichment
)
In [ ]:
def enrich_tweets_4(istream,enrichment_class,**kwargs):
"""
better use of threads
"""
enrichment_instance = enrichment_class()
queue = [] # queue of (thread,tweet) tuples
max_threads = kwargs['max_threads']
for tweet in istream:
# run `enrich` in a new thread
thread = threading.Thread(
target=enrichment_instance.enrich,
args=(tweet,)
)
thread.start()
queue.append((thread,tweet))
# don't accept more tweets until a thread is free
while len(queue) >= max_threads:
# iterate through all threads
# when threads are dead, remove from queue and yield tweet
new_queue = []
for thread,tweet in queue:
if thread.is_alive():
new_queue.append((thread,tweet))
else:
sys.stdout.write( str(tweet) + '\n') # print enriched tweet
queue = new_queue
time.sleep(0.1)
sys.stderr.write('submitted all tweets to threads' + '\n')
# cleanup threads that didn't finish while iterating through tweets
for thread,tweet in queue:
thread.join()
time.sleep(0.01)
sys.stdout.write( str(tweet) + '\n')
In [ ]:
%%time
enrich_tweets_4(
istream=stream_of_tweets(5),
enrichment_class=SlowEnrichment,
max_threads = 1 # play with this number
)
max_threads tweets in memoryA Future is an object that represents a deferred computation that may or may not have completed. That computation might be run on a separate thread but the Future itself doesn't care where the operation is run.
In [ ]:
from concurrent import futures
def enrich_tweets_5(istream,enrichment_class,**kwargs):
"""
use concurrent.futures instead of bare Threads
"""
enrichment_instance = enrichment_class()
with futures.ThreadPoolExecutor(max_workers=kwargs['max_workers']) as executor:
future_to_tweet = {}
for tweet in istream:
# run `enrich` in a new thread, via a Future
future = executor.submit(
enrichment_instance.enrich,
tweet
)
future_to_tweet[future] = tweet
sys.stderr.write('submitted all tweets as futures' + '\n')
for future in futures.as_completed(future_to_tweet):
sys.stdout.write( str(future_to_tweet[future]) + '\n')
In [ ]:
%%time
enrich_tweets_5(
istream=stream_of_tweets(5),
enrichment_class=SlowEnrichment,
max_workers = 5
)
futures.as_completed yields results as execution finishesjoin-ing threads, as above, because results are yielded as soon as they become availableCurrently, classes follow the enrichment protocol by defining an enrich method with the appropriate signature. Nothing is specififed regarding the return type or value.
We will now change this protocol such that the enrich method returns the enriched tweet dictionary, rather than relying on the mutability of the tweet dictionary passed to enrich. This allows us:
In [ ]:
class NewEnrichmentBase():
def enrich(self,tweet):
if 'enrichments' not in tweet:
tweet['enrichments'] = {}
tweet['enrichments'][type(self).__name__] = self.enrichment_value(tweet)
return tweet # <<-- the only new piece
In [ ]:
class NewSlowEnrichment(NewEnrichmentBase):
def enrichment_value(self,tweet):
# get the tweet number from body
# and sleep accordingly
seconds = int(tweet['body'].split('#')[-1]) + 1
if seconds > 9:
seconds = 1
time.sleep(seconds)
return str(seconds) + ' second nap'
In [ ]:
from concurrent import futures
def enrich_tweets_6(istream,enrichment_class,**kwargs):
"""
new enrichment protocol
"""
enrichment_instance = enrichment_class()
with futures.ThreadPoolExecutor(max_workers=kwargs['max_workers']) as executor:
futures_list = [] # <<-- this is now just a list of futures
for tweet in istream:
# run `enrich` in a new thread, via a Future
future = executor.submit(
enrichment_instance.enrich,
tweet
)
futures_list.append(future)
sys.stderr.write('submitted all tweets as futures' + '\n')
for future in futures.as_completed(futures_list):
sys.stdout.write( str(future.result()) + '\n')
In [ ]:
%%time
enrich_tweets_6(
istream=stream_of_tweets(5),
enrichment_class=NewSlowEnrichment,
max_workers = 5
)
In [ ]:
from concurrent import futures
def enrich_tweets_7(istream,enrichment_class,**kwargs):
"""
"""
def print_the_tweet(future):
sys.stdout.write( str(future.result()) + '\n')
enrichment_instance = enrichment_class()
with futures.ThreadPoolExecutor(max_workers=kwargs['max_workers']) as executor:
for tweet in istream:
# run `enrich` in a new thread, via a Future
future = executor.submit(
enrichment_instance.enrich,
tweet
)
future.add_done_callback(print_the_tweet)
sys.stderr.write('submitted all tweets as futures' + '\n')
In [ ]:
%%time
enrich_tweets_7(
istream=stream_of_tweets(5),
enrichment_class=NewSlowEnrichment,
max_workers = 5
)
In [ ]:
from concurrent import futures
def enrich_tweets_8(istream,enrichment_class,**kwargs):
"""
"""
max_workers = kwargs['max_workers']
def print_the_tweet(future):
sys.stdout.write( str(future.result()) + '\n')
enrichment_instance = enrichment_class()
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures_list = []
for tweet in istream:
# run `enrich` in a new thread, via a Future
future = executor.submit(
enrichment_instance.enrich,
tweet
)
future.add_done_callback(print_the_tweet)
futures_list.append(future)
futures_list[:] = [future for future in futures_list if future.running()]
while len(futures_list) >= max_workers:
futures_list[:] = [future for future in futures_list if future.running()]
time.sleep(0.5)
sys.stderr.write('submitted all tweets as futures' + '\n')
In [ ]:
%%time
enrich_tweets_8(
istream=stream_of_tweets(50),
enrichment_class=NewSlowEnrichment,
max_workers = 5
)
In [ ]:
import queue
def enrich_tweets_9(istream,enrichment_class,**kwargs):
"""
use a pool of threads, each running a worker reading from a common queue
"""
max_workers = kwargs['max_workers']
queue_size = kwargs['queue_size']
enrichment_instance = enrichment_class()
def worker():
"""
this function runs on new threads
and reads from a common queue
"""
time.sleep(0.5)
while True:
tweet = q.get()
if tweet is None: # this is the signal to exit
break
enriched_tweet = enrichment_instance.enrich(tweet)
sys.stdout.write(str(enriched_tweet) + '\n')
q.task_done()
time.sleep(0.1)
thread_pool = [threading.Thread(target=worker) for _ in range(max_workers)]
[thread.start() for thread in thread_pool]
q = queue.Queue(maxsize=queue_size)
for tweet in istream:
q.put(tweet)
sys.stderr.write('submitted all tweets to threads' + '\n')
# block until queue is empty
q.join()
# kill the threads
for _ in range(len(thread_pool)):
q.put(None)
for thread in thread_pool:
thread.join()
In [ ]:
%%time
enrich_tweets_9(
istream=stream_of_tweets(10),
enrichment_class=NewSlowEnrichment,
max_workers = 1,
queue_size=5
)
In [ ]:
from random import randrange
import hashlib
class CPUBoundEnrichment(NewEnrichmentBase):
def enrichment_value(self,tweet):
# make a SHA-256 hash of random byte arrays
data = bytearray(randrange(256) for i in range(2**21))
algo = hashlib.new('sha256')
algo.update(data)
return algo.hexdigest()
In [ ]:
def enrich_tweets_10(istream,enrichment_class,**kwargs):
"""
use a `ProcessPoolExecutor` to manage processes
"""
max_workers=kwargs['max_workers']
executor_name=kwargs['executor_name']
def print_the_tweet(future):
sys.stdout.write( str(future.result()) + '\n')
enrichment_instance = enrichment_class()
with getattr(futures,executor_name)(max_workers=max_workers) as executor: # <- this is the only change from #8
futures_list = []
for tweet in istream:
# run `enrich` in a new thread, via a Future
future = executor.submit(
enrichment_instance.enrich,
tweet
)
future.add_done_callback(print_the_tweet)
futures_list.append(future)
# have to throttle with this hack
futures_list[:] = [future for future in futures_list if future.running()]
while len(futures_list) >= max_workers:
futures_list[:] = [future for future in futures_list if future.running()]
time.sleep(0.5)
sys.stderr.write('submitted all tweets as futures' + '\n')
In [ ]:
%%time
enrich_tweets_10(
istream=stream_of_tweets(10),
enrichment_class=CPUBoundEnrichment,
executor_name='ProcessPoolExecutor',
max_workers = 2,
)
In [ ]: