asyncio IO Loop

Create an event loop (which automatically becomes the default event loop in the context).


In [1]:
import asyncio
loop = asyncio.get_event_loop()

Run a simple callback as soon as possible:


In [2]:
def hello_world():
    print('Hello World!')
    loop.stop()

loop.call_soon(hello_world)
loop.run_forever()


Hello World!

Coroutine Examples

Coroutines can be directly scheduled in the eventloop.


In [3]:
async def aprint(text):
    await asyncio.sleep(1)
    print(text)
    return 42

loop.run_until_complete(aprint('Hello world!'))


Hello world!
Out[3]:
42

You can use as many awaits as you like in a couroutine:


In [4]:
async def aprint_twice(text):
    await asyncio.sleep(1)
    print(text)    
    await asyncio.sleep(1)
    print(text + ' (once more)')
    return 42

loop.run_until_complete(aprint_twice('Hello world!'))


Hello world!
Hello world! (once more)
Out[4]:
42

Multiple Coroutines can be combined and executed concurrently:


In [5]:
loop.run_until_complete(asyncio.gather(aprint('Task 1'), aprint('Task 2')))


Task 1
Task 2
Out[5]:
[42, 42]

Exceptions work just like you would expect


In [6]:
async def raiser():
    await asyncio.sleep(1)
    raise ValueError()
    
async def catcher():
    try:
        await raiser()
    except ValueError:
        print('caught something')
        
loop.run_until_complete(catcher())


caught something

Automatic Checks

Not awaiting a coroutine raises an error.


In [7]:
a = aprint('Did I forget something?')
del(a)


/Users/niko/.virtualenvs/async-examples/lib/python3.5/site-packages/ipykernel/__main__.py:2: RuntimeWarning: coroutine 'aprint' was never awaited
  from ipykernel import kernelapp as app

Awaiting something that is not awaitable raises an error.


In [8]:
async def fail():
    await aprint
    
loop.run_until_complete(fail())


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-8-1eb58c86a9a1> in <module>()
      2     await aprint
      3 
----> 4 loop.run_until_complete(fail())

/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py in run_until_complete(self, future)
    340             raise RuntimeError('Event loop stopped before Future completed.')
    341 
--> 342         return future.result()
    343 
    344     def stop(self):

/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py in result(self)
    272             self._tb_logger = None
    273         if self._exception is not None:
--> 274             raise self._exception
    275         return self._result
    276 

/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py in _step(***failed resolving arguments***)
    237                 result = coro.throw(exc)
    238             else:
--> 239                 result = coro.send(value)
    240         except StopIteration as exc:
    241             self.set_result(exc.value)

<ipython-input-8-1eb58c86a9a1> in fail()
      1 async def fail():
----> 2     await aprint
      3 
      4 loop.run_until_complete(fail())

TypeError: object function can't be used in 'await' expression

Async Context Manager


In [9]:
class AsyncContextManager:
    async def __aenter__(self):
        await aprint('entering context')

    async def __aexit__(self, exc_type, exc, tb):
        await aprint('exiting context')

async def use_async_context():
    async with AsyncContextManager():
        print('Hello World!')
        
loop.run_until_complete(use_async_context())


entering context
Hello World!
exiting context

One example is using locks (even though this doesn't require async exiting).


In [10]:
lock = asyncio.Lock()

async def use_lock():
    async with lock:
        await asyncio.sleep(1)
        print('much lock, such concurrency')
        
loop.run_until_complete(asyncio.gather(use_lock(), use_lock()))


much lock, such concurrency
much lock, such concurrency
Out[10]:
[None, None]

Async for-loop

Prepare a simple MongoDB collection to show this feature.


In [11]:
from motor.motor_asyncio import AsyncIOMotorClient

collection = AsyncIOMotorClient().aiotest.test
loop.run_until_complete(collection.insert({'value': i} for i in range(10)))


Out[11]:
[ObjectId('5650a01a03389c1686e84c6b'),
 ObjectId('5650a01a03389c1686e84c6c'),
 ObjectId('5650a01a03389c1686e84c6d'),
 ObjectId('5650a01a03389c1686e84c6e'),
 ObjectId('5650a01a03389c1686e84c6f'),
 ObjectId('5650a01a03389c1686e84c70'),
 ObjectId('5650a01a03389c1686e84c71'),
 ObjectId('5650a01a03389c1686e84c72'),
 ObjectId('5650a01a03389c1686e84c73'),
 ObjectId('5650a01a03389c1686e84c74')]

The async for-loop saves us the boilerplate code to await each next value. Note that it runs sequentially (i.e., the elements are fetched after each other).


In [12]:
async def f():
    async for doc in collection.find():
        print(doc)
        
loop.run_until_complete(f())


{'_id': ObjectId('5650882603389c04487a0f8e'), 'value': 0}
{'_id': ObjectId('5650882603389c04487a0f8f'), 'value': 1}
{'_id': ObjectId('5650882603389c04487a0f90'), 'value': 2}
{'_id': ObjectId('5650882603389c04487a0f91'), 'value': 3}
{'_id': ObjectId('5650882603389c04487a0f92'), 'value': 4}
{'_id': ObjectId('5650882603389c04487a0f93'), 'value': 5}
{'_id': ObjectId('5650882603389c04487a0f94'), 'value': 6}
{'_id': ObjectId('5650882603389c04487a0f95'), 'value': 7}
{'_id': ObjectId('5650882603389c04487a0f96'), 'value': 8}
{'_id': ObjectId('5650882603389c04487a0f97'), 'value': 9}
{'_id': ObjectId('5650a01a03389c1686e84c6b'), 'value': 0}
{'_id': ObjectId('5650a01a03389c1686e84c6c'), 'value': 1}
{'_id': ObjectId('5650a01a03389c1686e84c6d'), 'value': 2}
{'_id': ObjectId('5650a01a03389c1686e84c6e'), 'value': 3}
{'_id': ObjectId('5650a01a03389c1686e84c6f'), 'value': 4}
{'_id': ObjectId('5650a01a03389c1686e84c70'), 'value': 5}
{'_id': ObjectId('5650a01a03389c1686e84c71'), 'value': 6}
{'_id': ObjectId('5650a01a03389c1686e84c72'), 'value': 7}
{'_id': ObjectId('5650a01a03389c1686e84c73'), 'value': 8}
{'_id': ObjectId('5650a01a03389c1686e84c74'), 'value': 9}

In [13]:
loop.run_until_complete(collection.drop())

Appendix

Futures

Futures are awaitable as well.


In [14]:
import collections
isinstance(asyncio.Future(), collections.abc.Awaitable)


Out[14]:
True

Confusion with Generators

Generators exceptions do not confuse Coroutines.


In [15]:
async def unconfused():
    g = iter(range(1))
    next(g)
    next(g)
    await asyncio.sleep(1)
    print('done!')

loop.run_until_complete(unconfused())


---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-15-d57b92647d06> in unconfused()
      3     next(g)
----> 4     next(g)
      5     await asyncio.sleep(1)

StopIteration: 

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
<ipython-input-15-d57b92647d06> in <module>()
      6     print('done!')
      7 
----> 8 loop.run_until_complete(unconfused())

/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py in run_until_complete(self, future)
    340             raise RuntimeError('Event loop stopped before Future completed.')
    341 
--> 342         return future.result()
    343 
    344     def stop(self):

/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py in result(self)
    272             self._tb_logger = None
    273         if self._exception is not None:
--> 274             raise self._exception
    275         return self._result
    276 

/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py in _step(***failed resolving arguments***)
    237                 result = coro.throw(exc)
    238             else:
--> 239                 result = coro.send(value)
    240         except StopIteration as exc:
    241             self.set_result(exc.value)

RuntimeError: coroutine raised StopIteration

Async generators on the other hand could be confused if the optinal decorator is not used.


In [16]:
# @asyncio.coroutine
def confused():
    g = iter(range(1))
    next(g)
    next(g)
    yield from asyncio.sleep(1)
    print('done!')

loop.run_until_complete(confused())