Asynchronous and non-Blocking I/O

Asynchronous

asynchronous function 은 종료되기 전에 리턴하고, 실제 작업은 백그라운드에서 처리한다. 몇 가지 asynchronous interface 스타일이 있는데:

  • Callback argument
  • Return a placeholder (Future, Promise, Deferred)
  • Deliver to a queue
  • Callback registry (e.g. POSIX signals)

어떤 interface 를 사용하든 상관없이, asynchronous function 의 정의는 caller 와 상호작용하므로, synchronous function 을 사용할때와는 다를 수 밖에 없다.

examples

  • synchronous http client

In [3]:
from tornado.httpclient import HTTPClient

def synchronous_fetch(url):
    http_client = HTTPClient()
    response = http_client.fetch(url)
    return response.body
  • asynchronous http client callback 버전

In [4]:
from tornado.httpclient import AsyncHTTPClient

def asynchronous_fetch(url, callback):
    http_client = AsyncHTTPClient()
    def handle_response(response):
        callback(response.body)
    http_client.fetch(url, callback=handle_response)
  • callback 대신 Future 를 이용한 asynchronous http client 버전

In [5]:
from tornado.concurrent import Future

def async_fetch_future(url):
    http_client = AsyncHTTPClient()
    my_future = Future()
    fetch_future = http_client.fetch(url)
    fetch_future.add_done_callback(lambda f: my_future.set_result(f.result()))
    return my_future

Future 버전이 복잡해보기긴 하지만 권장되는 방식이며 두 가지 큰 장점이 있다. 에러처리가 일관적인다. Future.result 메소드는 간단하게 exception 을 raise 할 수 있고, Futurecoroutine 에서 사용할 수 있도록 자신을 전달 할 수 있다.


In [6]:
from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    raise gen.Return(response.body)

raise gen.Return() 코드는 python 2.x 에서는 generator 가 값을 리턴할 수 없기때문에 사용한 코드로, tornado 는 이 문제를 해결하기 위해서 Return 이라는 특별한 예외를 발생시키고, coroutine 에서는 이 예외를 잡아서 리턴값으로 간주한다. python 3.x 에서는 return response.body 를 사용해도 된다.

Coroutines


In [13]:
from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    # In Python versions prior to 3.3, returning a value from
    # a generator is not allowed and you must use
    #   raise gen.Return(response.body)
    # instead.
    #return response.body
    raise gen.Return(response.body)

How it works

yield 를 포함한 함수는 generator 이고, 모든 generator 는 asynchronous 하다 ; 얘들은 호출되면 함수를 끝까지 실행하는게 아니라 generator 객체를 리턴한다.

tornado.gen.coroutine 데코레이터는

  • yield 를 통해서 generator 와 통신을 하고,
  • Future 를 리턴함으로써 coroutine 의 caller 와 통신한다.

In [15]:
# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

데코레이터는 generator 로부터 Future 를 받고, (비동기로) Future 가 완료되길 기다린다. 그리고는 Future 를 unwrap 하고, result 를 yield expression 의 결과값으로서 generator 에게 다시 보낸다. asynchronous function 에 의해서 리턴된 Future 를 yield expression 에 즉시 패스하는 경우를 제외하고는, 대부분의 asynchronous 코드는 Future class 를 직접 건드릴 일이 없다.

The decorator receives a Future from the generator, waits (without blocking) for that Future to complete, then “unwraps” the Future and sends the result back into the generator as the result of the yield expression. Most asynchronous code never touches the Future class directly except to immediately pass the Future returned by an asynchronous function to a yield expression.

Coroutine patterns

Interaction with callbacks

Future 대신 callback 을 사용하는 asynchronous code 를 처리하려면, Task 를 통해서 호출을 감싼다. 이는 callback argument 를 추가하고, yield 할 수 있는 Future 를 리턴한다.

To interact with asynchronous code that uses callbacks instead of Future, wrap the call in a Task. This will add the callback argument for you and return a Future which you can yield:


In [5]:
from tornado import gen

@gen.coroutine
def call_task():
    # Note that there are no parens on some_function.
    # This will be translated by Task into
    #   some_function(other_args, callback=callback)
    yield gen.Task(some_function, other_args)

Calling blocking function

coroutine 에서 blocking function 을 호출하는 가장 간단한 방법은 ThreadPoolExecutor 를 사용하는 것이다. 이는 coroutine 과 호환되는 Future 를 리턴한다.

The simplest way to call a blocking function from a coroutine is to use a ThreadPoolExecutor, which returns Futures that are compatible with coroutines:


In [15]:
from concurrent.futures import ThreadPoolExecutor

thread_pool = ThreadPoolExecutor(4)

@gen.coroutine
def call_blocking():
    yield thread_pool.submit(blocking_func, args)

Parallelism

coroutine decorator 는 Future 들로 구성된 list 와 dict 를 인지하고, 모든 Future 들을 병렬로 기다린다.

The coroutine decorator recognizes lists and dicts whose values are Futures, and waits for all of those Futures in parallel:


In [9]:
@gen.coroutine
def parallel_fetch(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1), http_client.fetch(url2)]

@gen.coroutine
def parallel_fetch_many(urls):
    responses = yield [http_client.fetch(url) for url in urls]
    # responses is a list of HTTPResponses in the same order

@gen.coroutine
def parallel_fetch_dict(urls):
    responses = yield {url: http_client.fetch(url) for url in urls}
    # responses is a dict {url: HTTPResponse}

Interleaving (낑겨넣기!)

때로는 Future 를 즉시 yield 하는것 보다 저장해두는 것이 좋을때도 있는데, 그렇게 하면 기다리기 전에 다른 오퍼레이션을 시작할 수 있기때문이다.

Sometimes it is useful to save a Future instead of yielding it immediately, so you can start another operation before waiting:


In [11]:
@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

Looping

Looping 은 coroutine 에게 있어서 까다로운 문제다. 왜냐하면 모든 for 또는 while 루프의 모든 iteration 에서 yield 를 할 수 있는 방법과, yield 의 result 를 캡쳐할 수 있는 방법이 없기때문이다. 대신에 Motor 의 예제에서 처럼 result 에 접근하는 것과 loop 조건을 분할 할 필요가 있다.

Looping is tricky with coroutines since there is no way in Python to yield on every iteration of a for or while loop and capture the result of the yield. Instead, you’ll need to separate the loop condition from accessing the results, as in this example from Motor:


In [13]:
import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()


---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-13-d0b0abd6daf9> in <module>()
----> 1 import motor
      2 db = motor.MotorClient().test
      3 
      4 @gen.coroutine
      5 def loop_example(collection):

ImportError: No module named motor

In [ ]: