Futures

A future is some object that represents an asynchronous computation. Like how a synchronous function returns a result, an asynchronous function may return a future. Rather than the function blocking when called, it can return the future immediately.

future = asynchronous_function_call()
print('Started async computation')  # runs immediately after function call

The future can then be passed around the code like a normal object, until a piece of code tries blocking on the result.

real_value = future.get_result()  # Returns immediately if future is already done, otherwise blocks.

Often, it is possible to derive a new future from an existing future, again without having to immediately block. Instead, a callback determines how the new future derives its value from the old future.

new_future = future.map_result(lambda real_value: (2 * real_value))

Like an event loop task, a future can have callbacks that get called when the result is available.

future.register_callback(lambda real_value: print("Future completed with result", real_value))

Finally, the code that owns the result of the callback can set the result whenever the computation is done.

future.set_result(10)

In a threaded program, asynchronous_function_call() might be a function that sends a computation to another thread, and creates a future object that is shared by both threads. The future contains a threading.Condition that gets set when the result is set, and calling future.get_result() waits on that Condition to be set. For examples of this kind of future, see concurrent.futures and https://pypi.python.org/pypi/aplus.

Event Loop Futures

Event loop futures are very similar to threading futures. The main difference is that the asynchronous computation is happening on the same thread, in the same event loop.

This means that get_result() can be a little smarter. Instead of putting the thread to sleep (which would be disastrous for the event loop), the task gets unscheduled from the event loop. At the same time, a new callback is registered with the future. This callback will reschedule the task with the event loop. Thus, when the future completes, the callback will be called, the task will be rescheduled, and eventually the task will be resumed, and will be able to use the result of the future.

And instead of the future being completed by another thread, it will be completed by another task on the event loop.

asyncio.Future

So far we've been discussing event loops in general terms, and deriving them from first principles. At this point, it makes sense to start focusing on the particulars of the asyncio interface. This certainly isn't the only way to do event loop futures, but it is the way that was chosen for the standard library implementation.

When a task calls get_result(), it wants to suspend itself if necessary, but it eventually wants the function to return with a value. What do we know of that can do that? That's right, a generator! And what kind of expression can extract the return value of a generator? yield from!

Thus, asyncio.Future went with the approach that a task can wait for a future to be complete by doing

real_value = yield from future

To do this, asyncio.Future needs to be an iterable, which yields to the event loop, but in the end returns the final result. Yielding needs to somehow signal to the event loop how to set up a relationship between the future and this task. It does so by yielding itself. The event loop then registers a callback on the future, such that when the future is completed, the task will be rescheduled with the event loop. Finally, the future is written such that when it is finished, all of its callbacks will be scheduled with the event loop.


In [1]:
import asyncio
from asyncio import base_futures, test_utils
import traceback

# Code sample from <https://github.com/python/cpython/tree/v3.6.1/Lib/asyncio>  
# Copyright (c) 2001-2017 Python Software Foundation.  
# All Rights Reserved.  
# License: Python license, <https://www.python.org/3.6/license.html>  
# Some modifications made so as to only highlight the interesting parts.
class Future:
    _done = False
    _result = None
    _loop = None
    
    # This field is used for a dual purpose:
    # - Its presence is a marker to declare that a class implements
    #   the Future protocol (i.e. is intended to be duck-type compatible).
    #   The value must also be not-None, to enable a subclass to declare
    #   that it is not compatible by setting this to None.
    # - It is set by __iter__() below so that Task._step() can tell
    #   the difference between `yield from Future()` (correct) vs.
    #   `yield Future()` (incorrect).
    _asyncio_future_blocking = False

    def __init__(self, *, loop):
        super().__init__()
        self._loop = loop
        self._callbacks = []
        
    def __repr__(self):
        info = [f"done={self._done!r}"]
        if self._done:
            info.append('result={}'.format(self._result))
        if self._callbacks:
            info.append(base_futures._format_callbacks(self._callbacks))
        return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
    
    def _schedule_callbacks(self):
        future_repr = repr(self)
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback in callbacks:
            print('Scheduling callback', callback, 'for', future_repr)
            self._loop.call_soon(callback, self)
            
    def done(self):
        return self._done
    
    def result(self):
        if not self._done:
            raise base_futures.InvalidStateError('Result is not ready.')
        return self._result
    
    def add_done_callback(self, fn):
        if self._done:
            print('Add done callback', fn, 'but', self, 'is already done, so calling soon')
            self._loop.call_soon(fn, self)
        else:
            self._callbacks.append(fn)
            print('Add done callback', fn, 'to', self)

    def remove_done_callback(self, fn):
        filtered_callbacks = [f for f in self._callbacks if f != fn]
        removed_count = len(self._callbacks) - len(filtered_callbacks)
        if removed_count:
            self._callbacks[:] = filtered_callbacks
        return removed_count
    
    def set_result(self, result):
        if self._done:
            raise base_futures.InvalidStateError('{!r}'.format(self))
        self._result = result
        self._done = True
        print('Set result', result, 'for', self)
        self._schedule_callbacks()
        
    def __iter__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()


loop = asyncio.new_event_loop()
    
def good_task(*, loop):
    print('Starting good task')
    future = Future(loop=loop)
    print(future)
    loop.call_soon(future.set_result, 10)
    print('Waiting on future')
    future = iter(future)
    result = (yield from future)
    print("Task resumed from waiting on", future)
    print("Result:", result)
    result2 = (yield from future)
    print("Didn't ever yield")
    print("Result:", result2)
    print("Yielding")
    yield
    print("Resumed")
    
asyncio.ensure_future(good_task(loop=loop), loop=loop)
for _ in range(100):
    test_utils.run_once(loop=loop)
    
def bad_task(*, loop):
    future = Future(loop=loop)
    try:
        result = (yield future)
    except RuntimeError:
        traceback.print_exc()

print()
asyncio.ensure_future(bad_task(loop=loop), loop=loop)
for _ in range(100):
    test_utils.run_once(loop=loop)


Starting good task
<Future done=False>
Waiting on future
Add done callback <TaskWakeupMethWrapper object at 0x10a8b93a8> to <Future done=False cb=[<TaskWakeupMethWrapper object at 0x10a8b93a8>()]>
Set result 10 for <Future done=True result=10 cb=[<TaskWakeupMethWrapper object at 0x10a8b93a8>()]>
Scheduling callback <TaskWakeupMethWrapper object at 0x10a8b93a8> for <Future done=True result=10 cb=[<TaskWakeupMethWrapper object at 0x10a8b93a8>()]>
Task resumed from waiting on <generator object Future.__iter__ at 0x10a8aea40>
Result: 10
Didn't ever yield
Result: None
Yielding
Resumed

Traceback (most recent call last):
  File "<ipython-input-1-2819b86a00b7>", line 114, in bad_task
    result = (yield future)
RuntimeError: yield was used instead of yield from in task <Task pending coro=<bad_task() running at <ipython-input-1-2819b86a00b7>:114>> with <Future done=False>

This snippet shows the basic mechanics of asyncio.Future. For simplicity, I did leave out some other functionality it has:

  • A future can also be completed with the set_exception() method, which will cause that exception to be raised inside the task when result() is called.
  • A future can be cancelled. This causes any tasks waiting on the future to also be cancelled.
  • Callbacks can be un-registered, if the event loop or another task determines that the callback is no longer needed.

Notice that asyncio.Future is not protected by any threading locks. This is one of the advantages of using an event loop instead of using threads. Since everything is running on the same thread, you don't get any surprise context switches. Every evaluation step is guaranteed to execute immediately after the previous evaulation step, unless the previous step is a yield or yield from. This drastically reduces the number of of ways in which your program to execute, and makes it much easier to reason about.