In [1]:
import aiohttp
import asyncio
import json
import logging
from IPython.html import widgets
from IPython.display import display as ipydisplay
from utils import colorify_log_handler
In [2]:
colorify_log_handler(
logging.getLogger().handlers[0], # IPython by default inject one
log_lineno = False,
time_fmt = '%H:%M:%S'
)
logger = logging.getLogger('bench_rest_api')
logger.setLevel(logging.DEBUG)
logging.getLogger('asyncio').setLevel(logging.DEBUG)
In [3]:
logger.info('This is info')
logger.debug('我會說中文喔')
logger.error('……人家不是喜歡才跟你講話的喔')
logger.warning('笨蛋')
In [4]:
!curl -s -XGET "http://localhost:5566/" | python -m json.tool
In [5]:
!curl -s -XGET "http://localhost:5566/quote/uniform" | python -m json.tool
In [6]:
%%bash
ab -c 10 -n 10 "http://localhost:5566/quote?slow=true"
In [7]:
@asyncio.coroutine
def quote_simple(url='http://localhost:5566/quote/uniform', slow=False):
r = yield from aiohttp.request(
'GET', url, params={'slow': True} if slow else {}
)
if r.status != 200:
logger.error('Unsuccessful response [Status: %s (%d)]'
% (r.reason, r.status))
r.close(force=True)
return None
quote_json = yield from r.json()
return quote_json['quote']
In [8]:
loop = asyncio.get_event_loop()
To run a simple asyncio corountine.
In [9]:
coro = quote_simple()
quote = loop.run_until_complete(coro)
quote
Out[9]:
Internally asyncio wraps it with asyncio.Task.
So the following works equivalently.
In [10]:
task = asyncio.Task(quote_simple())
quote = loop.run_until_complete(task)
quote
Out[10]:
However, coro is corountine, and task is Task (subclass of Future).
One can use asyncio.ensure_future to make sure having a Future obj returned.
In [11]:
type(coro), type(task)
Out[11]:
Passing wrong URL gives error
In [12]:
quote = loop.run_until_complete(
quote_simple(url='http://localhost:5566/quote/uniform?part=100')
)
In [13]:
@asyncio.coroutine
def quote_many_naive(num_quotes=1):
coroutines = [
quote_simple(slow=True) for i in range(num_quotes)
]
quotes = yield from (asyncio.gather(*coroutines))
return quotes
In [14]:
%%time
quotes = loop.run_until_complete(quote_many_naive(2000))
This is not helping since we open 2000 connections at a time. It is slower than expected.
Ref on official site.
In [15]:
@asyncio.coroutine
def quote(conn, url='http://localhost:5566/quote/uniform', slow=False):
r = yield from aiohttp.request(
'GET', url, params={'slow': True} if slow else {},
connector=conn
)
if r.status != 200:
logger.error('Unsuccessful response [Status: %s (%d)]'
% (r.reason, r.status))
r.close(force=True)
return None
quote_json = yield from r.json()
r.close(force=True)
return quote_json['quote']
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20):
conn = aiohttp.TCPConnector(keepalive_timeout=1, force_close=True, limit=conn_limit)
coroutines = [
quote(conn) for i in range(num_quotes)
]
quotes = yield from (asyncio.gather(*coroutines))
return quotes
In [16]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100))
I don't know why, but using its internal connection limit is slow. But we can implement one ourselves.
Use asyncio.Semaphore acting as a lock.
In [17]:
def quote_with_lock(semaphore, url='http://localhost:5566/quote/uniform'):
with (yield from semaphore):
r = yield from aiohttp.request('GET', url)
if r.status != 200:
logger.error('Unsuccessful response [Status: %s (%d)]'
% (r.reason, r.status))
r.close(force=True)
return None
quote_json = yield from r.json()
r.close(force=True)
return quote_json['quote']
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20):
semaphore = asyncio.Semaphore(conn_limit)
coroutines = [
quote_with_lock(semaphore) for i in range(num_quotes)
]
quotes = yield from (asyncio.gather(*coroutines))
return quotes
In [18]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100))
If you don't care the original of coroutines
In [19]:
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20, progress=None, step=10):
if progress is None:
progress = widgets.IntProgress()
progress.max = num_quotes // step
ipydisplay(progress)
semaphore = asyncio.Semaphore(conn_limit)
coroutines = [
quote_with_lock(semaphore) for i in range(num_quotes)
]
# quotes = yield from (asyncio.gather(*coroutines))
quotes = []
for ith, coro in enumerate(asyncio.as_completed(coroutines), 1):
if ith % step == 0:
progress.value += 1
q = yield from coro
quotes.append(q)
return quotes
In [20]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=1))
For fast response, progress bar introduces considerable latency. Try modify the step higher.
In [21]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=20))
In [22]:
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20, progress=None, step=10):
if progress is None:
progress = widgets.IntProgress()
progress.max = num_quotes // step
ipydisplay(progress)
# create the lock
semaphore = asyncio.Semaphore(conn_limit)
finished_task_count = 0
def progress_adder(fut):
nonlocal finished_task_count
finished_task_count += 1
if finished_task_count % step == 0:
progress.value += 1
# wrap coroutines as Tasks
futures = []
for i in range(num_quotes):
task = asyncio.Task(quote_with_lock(semaphore))
task.add_done_callback(progress_adder)
futures.append(task)
quotes = yield from (asyncio.gather(*futures))
return quotes
In [23]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=1))
In [24]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=20))
In [25]:
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20, progress=None, step=10):
if progress is None:
progress = widgets.IntProgress()
progress.max = num_quotes // step
ipydisplay(progress)
semaphore = asyncio.Semaphore(conn_limit)
# wrap coroutines with future
# For Python 3.4.4+, asyncio.ensure_future(...)
# will wrap coro as Task and keep input the same
# if it is already Future.
futures = [
asyncio.ensure_future(quote_with_lock(semaphore))
for i in range(num_quotes)
]
for ith, coro in enumerate(asyncio.as_completed(futures), 1):
if ith % step == 0:
progress.value += 1
yield from coro
quotes = [fut.result() for fut in futures]
return quotes
In [26]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=20))