In this notebook, we'll weave together our new (Tweet Parser)[https://github.com/tw-ddis/tweet_parser] and some python asyncio magic.
Let's set up the environment and demonstrate a motivating example.
In [ ]:
from IPython.display import HTML
HTML('<iframe width="560" height="315" src="https://www.youtube.com/embed/dD9NgzLhbBM" frameborder="0" allowfullscreen></iframe>')
In [ ]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import itertools as it
from functools import partial
import seaborn as sns
import pandas as pd
import requests
from tweet_parser.tweet import Tweet
import sec # you will not have this python file; I use it to keep `secrets` like passwords hidden
We can define a few constants here that will be used throughout our example.
In [ ]:
username = "agonzales@twitter.com"
AUTH = requests.auth.HTTPBasicAuth(username, sec.GNIP_API_PW)
GNIP_BASE_URL = "https://gnip-api.twitter.com/search/30day/accounts/shendrickson/peabody.json?"
This function is a little helper for programatically generating valid queries for terms with the Gnip api.
In [ ]:
def gen_query_url(url, terms, max_results=100):
if isinstance(terms, str):
terms = terms.split()
return ''.join([url,
"query=",
"%20".join(terms),
"&maxResults={}".format(max_results)])
Lets say you want to get a collection of tweets matching some criteria - this is an extremely common task. The process might look something like this:
In [ ]:
query = gen_query_url(GNIP_BASE_URL, ["just", "bought", "a", "house"])
print(query)
In [ ]:
import requests
def sync_tweets(query):
return requests.get(url=query, auth=AUTH).json()['results']
In [ ]:
%%time
tweets = [Tweet(i) for i in sync_tweets(query)]
In [ ]:
print(tweets[0].text)
Easy peasy. What if you have a bunch of queries to match (this is a bit contrived, but serves a purpose). You might define all your queries as such and run a for loop to query all of them.
In [ ]:
formed_query = partial(gen_query_url, url=GNIP_BASE_URL, max_results=100)
queries = [formed_query(terms=[i]) for i in ["eclipse", "nuclear", "korea", "cats", "ai", "memes", "googlebro"]]
queries
In [ ]:
%%time
tweets = [Tweet(i) for i in it.chain.from_iterable([sync_tweets(query) for query in queries])]
Works great, but notice that there seems to be linear scaling for the time it takes to run this. Given that this is a trivial amount of computation and a task that is almost entirely taken up by system calls / IO, it's a perfect opportunity to add parallism to the mix and speed it up.
IO-bound parallism is commonly handled with a technique called asyncronous programming, in which the semantics coroutine, event loop, user-level thread, task, future, etc. are introduced.
In modern python (>3.5), the language has builtins for using coroutines, exposed via the asyncio module and the keywords async and await. Several libraries have been introduced that make use of coroutines internally, such as aiohttp, which is mostly a coroutine verison of requests.
Let's look at what the basic coroutine version of our above simple example would look like in aiohttp:
In [ ]:
import asyncio
import aiohttp
import async_timeout
async def fetch_tweets_coroutine(url):
async with aiohttp.ClientSession() as session:
async with session.get(url, auth=aiohttp.BasicAuth(AUTH.username, AUTH.password)) as response:
return await response.json()
In [ ]:
%%time
loop = asyncio.get_event_loop()
tweets = [Tweet(i) for i in loop.run_until_complete(fetch_tweets_coroutine(query))['results']]
In [ ]:
print(tweets[0].user_id, tweets[0].text)
It's a lot more code that our simple requests example and doesn't work any more quickly, though this is expected since the time is really response time to and from Gnip.
Let's try again with our longer set of queries, redefining the methods to handle this more naturally.
In [ ]:
async def fetch_tweets_fancy(session, url):
async with session.get(url, auth=aiohttp.BasicAuth(AUTH.username, AUTH.password)) as response:
# print("collecting query: {}".format(url))
_json = await response.json()
return [Tweet(t) for t in _json["results"]]
async def collect_queries(queries):
tasks = []
async with aiohttp.ClientSession() as session:
for query in queries:
task = asyncio.ensure_future(fetch_tweets_fancy(session, query))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
In [ ]:
formed_query = partial(gen_query_url, url=GNIP_BASE_URL, max_results=100)
queries = [formed_query(terms=[i]) for i in ["eclipse", "nuclear", "korea", "cats", "ai", "memes"]]
In [ ]:
%%time
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(collect_queries(queries))
res = list(it.chain.from_iterable(loop.run_until_complete(future)))
In [ ]:
print(res[0].text)
print(len(res))
So, what the hell is a coroutine and why does it work the way that it does?
First, we have to talk about the differences between threads, coroutines, parallelism, and concurrency. For a short intro, see (this SO thread)[https://stackoverflow.com/questions/1934715/difference-between-a-coroutine-and-a-thread]
Concurrency - separation of tasks for seamless execution (IDE, operating systems, complex ops)
parallelism - execution of multiple tasks simultaneously to increase speed
thread - OS level scheduling and concurrency. blocking, context switching, deadlocks, lock contention, kernel, premption
ULT / Coroutine - non-blocking, program-level, event-based, "juggling"
(discussion)
further reading:
(coroutine in node.js and python)[http://sahandsaba.com/understanding-asyncio-node-js-python-3-4.html]
(combinatorics and coroutines)[http://sahandsaba.com/combinatorial-generation-using-coroutines-in-python.html]
(Long example of python internals / building coroutines)[http://www.aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html]
(making 1 million requests with aiohttp)[https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html]
(coroutine patterns)[https://medium.com/python-pandemonium/asyncio-coroutine-patterns-beyond-await-a6121486656f]