In [1]:
import aiohttp
import asyncio
import json
import logging

from IPython.html import widgets
from IPython.display import display as ipydisplay

from utils import colorify_log_handler


:0: FutureWarning: IPython widgets are experimental and may change in the future.

In [2]:
colorify_log_handler(
    logging.getLogger().handlers[0],  # IPython by default inject one
    log_lineno = False,
    time_fmt = '%H:%M:%S'
)

logger = logging.getLogger('bench_rest_api')
logger.setLevel(logging.DEBUG)

logging.getLogger('asyncio').setLevel(logging.DEBUG)

In [3]:
logger.info('This is info')
logger.debug('我會說中文喔')
logger.error('……人家不是喜歡才跟你講話的喔')
logger.warning('笨蛋')


00:18:39.806 I [bench_rest_api <module>] This is info
00:18:39.806 D [bench_rest_api <module>] 我會說中文喔
00:18:39.807 E [bench_rest_api <module>] ……人家不是喜歡才跟你講話的喔
00:18:39.807 W [bench_rest_api <module>] 笨蛋

In [4]:
!curl -s -XGET "http://localhost:5566/" | python -m json.tool


{
    "env_details": {
        "num_process": 1,
        "quotes_pikle_pth": "parsed_1984.pkl"
    },
    "version": "2015.7"
}

In [5]:
!curl -s -XGET "http://localhost:5566/quote/uniform" | python -m json.tool


{
    "quote": "'Nonsense. You are under the impression that hatred is more exhausting than love. Why should it be? And if it were, what difference would that make? Suppose that we choose to wear ourselves out faster. Suppose that we quicken the tempo of human life till men are senile at thirty. Still what difference would it make? Can you not understand that the death of the individual is not death? The party is immortal.'",
    "source": {
        "chapter": 3,
        "part": 3
    }
}

In [6]:
%%bash

ab -c 10 -n 10 "http://localhost:5566/quote?slow=true"


This is ApacheBench, Version 2.3 <$Revision: 1604373 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient).....done


Server Software:        TornadoServer/4.2
Server Hostname:        localhost
Server Port:            5566

Document Path:          /quote?slow=true
Document Length:        275 bytes

Concurrency Level:      10
Time taken for tests:   0.507 seconds
Complete requests:      10
Failed requests:        9
   (Connect: 0, Receive: 0, Length: 9, Exceptions: 0)
Total transferred:      5303 bytes
HTML transferred:       3293 bytes
Requests per second:    19.71 [#/sec] (mean)
Time per request:       507.339 [ms] (mean)
Time per request:       50.734 [ms] (mean, across all concurrent requests)
Transfer rate:          10.21 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.0      0       0
Processing:   502  504   2.0    505     507
Waiting:      502  504   2.0    505     507
Total:        502  505   2.0    505     507

Percentage of the requests served within a certain time (ms)
  50%    505
  66%    506
  75%    506
  80%    507
  90%    507
  95%    507
  98%    507
  99%    507
 100%    507 (longest request)

Basic


In [7]:
@asyncio.coroutine
def quote_simple(url='http://localhost:5566/quote/uniform', slow=False):
    r = yield from aiohttp.request(
        'GET', url, params={'slow': True} if slow else {}
    )
    if r.status != 200:
        logger.error('Unsuccessful response [Status: %s (%d)]' 
                     % (r.reason, r.status))
        r.close(force=True)
        return None
    quote_json = yield from r.json()
    return quote_json['quote']

In [8]:
loop = asyncio.get_event_loop()


00:18:40.617 D [asyncio __init__] Using selector: EpollSelector

To run a simple asyncio corountine.


In [9]:
coro = quote_simple()
quote = loop.run_until_complete(coro)
quote


Out[9]:
"'When I was arrested, Oceania was at war with Eastasia.'"

Internally asyncio wraps it with asyncio.Task. So the following works equivalently.


In [10]:
task = asyncio.Task(quote_simple())
quote = loop.run_until_complete(task)
quote


Out[10]:
'In past ages, a war, almost by definition, was something that sooner or later came to an end, usually in unmistakable victory or defeat. In the past, also, war was one of the main instruments by which human societies were kept in touch with physical reality. All rulers in all ages have tried to impose a false view of the world upon their followers, but they could not afford to encourage any illusion that tended to impair military efficiency. So long as defeat meant the loss of independence, or some other result generally held to be undesirable, the precautions against defeat had to be serious. Physical facts could not be ignored. In philosophy, or religion, or ethics, or politics, two and two might make five, but when one was designing a gun or an aeroplane they had to make four. Inefficient nations were always conquered sooner or later, and the struggle for efficiency was inimical to illusions. Moreover, to be efficient it was necessary to be able to learn from the past, which meant having a fairly accurate idea of what had happened in the past. Newspapers and history books were, of course, always coloured and biased, but falsification of the kind that is practised today would have been impossible. War was a sure safeguard of sanity, and so far as the ruling classes were concerned it was probably the most important of all safeguards. While wars could be won or lost, no ruling class could be completely irresponsible.'

However, coro is corountine, and task is Task (subclass of Future).

One can use asyncio.ensure_future to make sure having a Future obj returned.


In [11]:
type(coro), type(task)


Out[11]:
(generator, asyncio.tasks.Task)

Passing wrong URL gives error


In [12]:
quote = loop.run_until_complete(
    quote_simple(url='http://localhost:5566/quote/uniform?part=100')
)


00:18:40.649 E [bench_rest_api quote_simple] Unsuccessful response [Status: Bad Request (400)]

Multiple Concurrent Requests


In [13]:
@asyncio.coroutine
def quote_many_naive(num_quotes=1):
    coroutines = [
        quote_simple(slow=True) for i in range(num_quotes)
    ]
    quotes = yield from (asyncio.gather(*coroutines))
    return quotes

In [14]:
%%time
quotes = loop.run_until_complete(quote_many_naive(2000))


CPU times: user 2.05 s, sys: 240 ms, total: 2.29 s
Wall time: 4.56 s

This is not helping since we open 2000 connections at a time. It is slower than expected.

Limiting connection pool size

Ref on official site.


In [15]:
@asyncio.coroutine
def quote(conn, url='http://localhost:5566/quote/uniform', slow=False):
    r = yield from aiohttp.request(
        'GET', url, params={'slow': True} if slow else {},
        connector=conn
    )
    if r.status != 200:
        logger.error('Unsuccessful response [Status: %s (%d)]' 
                     % (r.reason, r.status))
        r.close(force=True)
        return None
    quote_json = yield from r.json()
    r.close(force=True)
    return quote_json['quote']

@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20):
    conn = aiohttp.TCPConnector(keepalive_timeout=1, force_close=True, limit=conn_limit)
    coroutines = [
        quote(conn) for i in range(num_quotes)
    ]
    quotes = yield from (asyncio.gather(*coroutines))
    return quotes

In [16]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100))


CPU times: user 1.88 s, sys: 196 ms, total: 2.08 s
Wall time: 3.79 s

I don't know why, but using its internal connection limit is slow. But we can implement one ourselves.

Custom connection limit using semaphore

Use asyncio.Semaphore acting as a lock.


In [17]:
def quote_with_lock(semaphore, url='http://localhost:5566/quote/uniform'):
    with (yield from semaphore):
        r = yield from aiohttp.request('GET', url)
        if r.status != 200:
            logger.error('Unsuccessful response [Status: %s (%d)]' 
                         % (r.reason, r.status))
            r.close(force=True)
            return None
        quote_json = yield from r.json()
    r.close(force=True)
    return quote_json['quote']

@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20):
    semaphore = asyncio.Semaphore(conn_limit)
    coroutines = [
        quote_with_lock(semaphore) for i in range(num_quotes)
    ]
    quotes = yield from (asyncio.gather(*coroutines))
    return quotes

In [18]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100))


CPU times: user 1.96 s, sys: 212 ms, total: 2.17 s
Wall time: 1.97 s

Add Progressbar

If you don't care the original of coroutines


In [19]:
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20, progress=None, step=10):
    if progress is None:
        progress = widgets.IntProgress()
        progress.max = num_quotes // step
        ipydisplay(progress)
    semaphore = asyncio.Semaphore(conn_limit)
    coroutines = [
        quote_with_lock(semaphore) for i in range(num_quotes)
    ]
    # quotes = yield from (asyncio.gather(*coroutines))
    quotes = []
    for ith, coro in enumerate(asyncio.as_completed(coroutines), 1):
        if ith % step == 0:
            progress.value += 1
        q = yield from coro
        quotes.append(q)
    return quotes

In [20]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=1))


CPU times: user 2.61 s, sys: 324 ms, total: 2.93 s
Wall time: 2.67 s

For fast response, progress bar introduces considerable latency. Try modify the step higher.


In [21]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=20))


CPU times: user 1.94 s, sys: 204 ms, total: 2.14 s
Wall time: 1.96 s

Original order matters

... go eat yourself.


In [22]:
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20, progress=None, step=10):
    if progress is None:
        progress = widgets.IntProgress()
        progress.max = num_quotes // step
        ipydisplay(progress)

    # create the lock
    semaphore = asyncio.Semaphore(conn_limit)

    finished_task_count = 0
    def progress_adder(fut):
        nonlocal finished_task_count
        finished_task_count += 1
        if finished_task_count % step == 0:
            progress.value += 1
    
    # wrap coroutines as Tasks
    futures = []
    for i in range(num_quotes):
        task = asyncio.Task(quote_with_lock(semaphore))
        task.add_done_callback(progress_adder)
        futures.append(task)
    
    quotes = yield from (asyncio.gather(*futures))
    return quotes

In [23]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=1))


CPU times: user 2.64 s, sys: 364 ms, total: 3 s
Wall time: 2.75 s

In [24]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=20))


CPU times: user 1.88 s, sys: 264 ms, total: 2.15 s
Wall time: 1.98 s

Alternative way


In [25]:
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20, progress=None, step=10):
    if progress is None:
        progress = widgets.IntProgress()
        progress.max = num_quotes // step
        ipydisplay(progress)
    
    semaphore = asyncio.Semaphore(conn_limit)
    
    # wrap coroutines with future
    # For Python 3.4.4+, asyncio.ensure_future(...)
    # will wrap coro as Task and keep input the same 
    # if it is already Future.
    futures = [
        asyncio.ensure_future(quote_with_lock(semaphore))
        for i in range(num_quotes)
    ]

    for ith, coro in enumerate(asyncio.as_completed(futures), 1):
        if ith % step == 0:
            progress.value += 1
        yield from coro
        
    quotes = [fut.result() for fut in futures]
    return quotes

In [26]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=20))


CPU times: user 1.91 s, sys: 276 ms, total: 2.19 s
Wall time: 2.02 s