Coroutines for IO-bound tasks

In this notebook, we'll weave together our new (Tweet Parser)[https://github.com/tw-ddis/tweet_parser] and some python asyncio magic.

Let's set up the environment and demonstrate a motivating example.


In [ ]:
from IPython.display import HTML
HTML('<iframe width="560" height="315" src="https://www.youtube.com/embed/dD9NgzLhbBM" frameborder="0" allowfullscreen></iframe>')

In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

import itertools as it
from functools import partial

import seaborn as sns
import pandas as pd
import requests
from tweet_parser.tweet import Tweet

import sec # you will not have this python file; I use it to keep `secrets` like passwords hidden

We can define a few constants here that will be used throughout our example.


In [ ]:
username = "agonzales@twitter.com"
AUTH = requests.auth.HTTPBasicAuth(username, sec.GNIP_API_PW)
GNIP_BASE_URL = "https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?"

This function is a little helper for programatically generating valid queries for terms with the Gnip api.


In [ ]:
def gen_query_url(url, terms, max_results=100):
    if isinstance(terms, str):
        terms = terms.split()
    return ''.join([url,
                    "query=",
                    "%20".join(terms),
                    "&maxResults={}".format(max_results)])

Lets say you want to get a collection of tweets matching some criteria - this is an extremely common task. The process might look something like this:


In [ ]:
query = gen_query_url(GNIP_BASE_URL, ["just", "bought", "a", "house"])
print(query)

In [ ]:
import requests

def sync_tweets(query):
    return requests.get(url=query, auth=AUTH).json()['results']

In [ ]:
%%time
tweets = [Tweet(i) for i in sync_tweets(query)]

In [ ]:
print(tweets[0].text)

Easy peasy. What if you have a bunch of queries to match (this is a bit contrived, but serves a purpose). You might define all your queries as such and run a for loop to query all of them.


In [ ]:
formed_query = partial(gen_query_url, url=GNIP_BASE_URL, max_results=100)
queries = [formed_query(terms=[i]) for i in ["eclipse", "nuclear", "korea", "cats", "ai", "memes", "googlebro"]]
queries

In [ ]:
%%time
tweets = [Tweet(i) for i in it.chain.from_iterable([sync_tweets(query) for query in queries])]

Works great, but notice that there seems to be linear scaling for the time it takes to run this. Given that this is a trivial amount of computation and a task that is almost entirely taken up by system calls / IO, it's a perfect opportunity to add parallism to the mix and speed it up.

IO-bound parallism is commonly handled with a technique called asyncronous programming, in which the semantics coroutine, event loop, user-level thread, task, future, etc. are introduced.

In modern python (>3.5), the language has builtins for using coroutines, exposed via the asyncio module and the keywords async and await. Several libraries have been introduced that make use of coroutines internally, such as aiohttp, which is mostly a coroutine verison of requests.

Let's look at what the basic coroutine version of our above simple example would look like in aiohttp:


In [ ]:
import asyncio
import aiohttp
import async_timeout

    
async def fetch_tweets_coroutine(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url, auth=aiohttp.BasicAuth(AUTH.username, AUTH.password)) as response:
            return await response.json()

In [ ]:
%%time
loop = asyncio.get_event_loop()
tweets = [Tweet(i) for i in loop.run_until_complete(fetch_tweets_coroutine(query))['results']]

In [ ]:
print(tweets[0].user_id, tweets[0].text)

It's a lot more code that our simple requests example and doesn't work any more quickly, though this is expected since the time is really response time to and from Gnip.

Let's try again with our longer set of queries, redefining the methods to handle this more naturally.


In [ ]:
async def fetch_tweets_fancy(session, url):
    async with session.get(url, auth=aiohttp.BasicAuth(AUTH.username, AUTH.password)) as response:
        # print("collecting query: {}".format(url))
        _json = await response.json()
        return [Tweet(t) for t in _json["results"]]
    
    
async def collect_queries(queries):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for query in queries:
            task = asyncio.ensure_future(fetch_tweets_fancy(session, query))
            tasks.append(task)
        responses = await asyncio.gather(*tasks)
        return responses

In [ ]:
formed_query = partial(gen_query_url, url=GNIP_BASE_URL, max_results=100)
queries = [formed_query(terms=[i]) for i in ["eclipse", "nuclear", "korea", "cats", "ai", "memes"]]

In [ ]:
%%time
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(collect_queries(queries))
res = list(it.chain.from_iterable(loop.run_until_complete(future)))

In [ ]:
print(res[0].text)
print(len(res))

So, what the hell is a coroutine and why does it work the way that it does?

First, we have to talk about the differences between threads, coroutines, parallelism, and concurrency. For a short intro, see (this SO thread)[https://stackoverflow.com/questions/1934715/difference-between-a-coroutine-and-a-thread]

  • Concurrency - separation of tasks for seamless execution (IDE, operating systems, complex ops)

  • parallelism - execution of multiple tasks simultaneously to increase speed

  • thread - OS level scheduling and concurrency. blocking, context switching, deadlocks, lock contention, kernel, premption

  • ULT / Coroutine - non-blocking, program-level, event-based, "juggling"

(discussion)

further reading: