A future is some object that represents an asynchronous computation. Like how a synchronous function returns a result, an asynchronous function may return a future. Rather than the function blocking when called, it can return the future immediately.
future = asynchronous_function_call()
print('Started async computation') # runs immediately after function call
The future can then be passed around the code like a normal object, until a piece of code tries blocking on the result.
real_value = future.get_result() # Returns immediately if future is already done, otherwise blocks.
Often, it is possible to derive a new future from an existing future, again without having to immediately block. Instead, a callback determines how the new future derives its value from the old future.
new_future = future.map_result(lambda real_value: (2 * real_value))
Like an event loop task, a future can have callbacks that get called when the result is available.
future.register_callback(lambda real_value: print("Future completed with result", real_value))
Finally, the code that owns the result of the callback can set the result whenever the computation is done.
future.set_result(10)
In a threaded program, asynchronous_function_call()
might be a function that sends a computation to another thread, and creates a future object that is shared by both threads. The future contains a threading.Condition
that gets set when the result is set, and calling future.get_result()
waits on that Condition
to be set. For examples of this kind of future, see concurrent.futures
and https://pypi.python.org/pypi/aplus.
Event loop futures are very similar to threading futures. The main difference is that the asynchronous computation is happening on the same thread, in the same event loop.
This means that get_result()
can be a little smarter. Instead of putting the thread to sleep (which would be disastrous for the event loop), the task gets unscheduled from the event loop. At the same time, a new callback is registered with the future. This callback will reschedule the task with the event loop. Thus, when the future completes, the callback will be called, the task will be rescheduled, and eventually the task will be resumed, and will be able to use the result of the future.
And instead of the future being completed by another thread, it will be completed by another task on the event loop.
So far we've been discussing event loops in general terms, and deriving them from first principles. At this point, it makes sense to start focusing on the particulars of the asyncio
interface. This certainly isn't the only way to do event loop futures, but it is the way that was chosen for the standard library implementation.
When a task calls get_result()
, it wants to suspend itself if necessary, but it eventually wants the function to return with a value. What do we know of that can do that? That's right, a generator! And what kind of expression can extract the return value of a generator? yield from
!
Thus, asyncio.Future
went with the approach that a task can wait for a future to be complete by doing
real_value = yield from future
To do this, asyncio.Future
needs to be an iterable, which yields to the event loop, but in the end returns the final result. Yielding needs to somehow signal to the event loop how to set up a relationship between the future and this task. It does so by yielding itself. The event loop then registers a callback on the future, such that when the future is completed, the task will be rescheduled with the event loop. Finally, the future is written such that when it is finished, all of its callbacks will be scheduled with the event loop.
In [1]:
import asyncio
from asyncio import base_futures, test_utils
import traceback
# Code sample from <https://github.com/python/cpython/tree/v3.6.1/Lib/asyncio>
# Copyright (c) 2001-2017 Python Software Foundation.
# All Rights Reserved.
# License: Python license, <https://www.python.org/3.6/license.html>
# Some modifications made so as to only highlight the interesting parts.
class Future:
_done = False
_result = None
_loop = None
# This field is used for a dual purpose:
# - Its presence is a marker to declare that a class implements
# the Future protocol (i.e. is intended to be duck-type compatible).
# The value must also be not-None, to enable a subclass to declare
# that it is not compatible by setting this to None.
# - It is set by __iter__() below so that Task._step() can tell
# the difference between `yield from Future()` (correct) vs.
# `yield Future()` (incorrect).
_asyncio_future_blocking = False
def __init__(self, *, loop):
super().__init__()
self._loop = loop
self._callbacks = []
def __repr__(self):
info = [f"done={self._done!r}"]
if self._done:
info.append('result={}'.format(self._result))
if self._callbacks:
info.append(base_futures._format_callbacks(self._callbacks))
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
def _schedule_callbacks(self):
future_repr = repr(self)
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback in callbacks:
print('Scheduling callback', callback, 'for', future_repr)
self._loop.call_soon(callback, self)
def done(self):
return self._done
def result(self):
if not self._done:
raise base_futures.InvalidStateError('Result is not ready.')
return self._result
def add_done_callback(self, fn):
if self._done:
print('Add done callback', fn, 'but', self, 'is already done, so calling soon')
self._loop.call_soon(fn, self)
else:
self._callbacks.append(fn)
print('Add done callback', fn, 'to', self)
def remove_done_callback(self, fn):
filtered_callbacks = [f for f in self._callbacks if f != fn]
removed_count = len(self._callbacks) - len(filtered_callbacks)
if removed_count:
self._callbacks[:] = filtered_callbacks
return removed_count
def set_result(self, result):
if self._done:
raise base_futures.InvalidStateError('{!r}'.format(self))
self._result = result
self._done = True
print('Set result', result, 'for', self)
self._schedule_callbacks()
def __iter__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result()
loop = asyncio.new_event_loop()
def good_task(*, loop):
print('Starting good task')
future = Future(loop=loop)
print(future)
loop.call_soon(future.set_result, 10)
print('Waiting on future')
future = iter(future)
result = (yield from future)
print("Task resumed from waiting on", future)
print("Result:", result)
result2 = (yield from future)
print("Didn't ever yield")
print("Result:", result2)
print("Yielding")
yield
print("Resumed")
asyncio.ensure_future(good_task(loop=loop), loop=loop)
for _ in range(100):
test_utils.run_once(loop=loop)
def bad_task(*, loop):
future = Future(loop=loop)
try:
result = (yield future)
except RuntimeError:
traceback.print_exc()
print()
asyncio.ensure_future(bad_task(loop=loop), loop=loop)
for _ in range(100):
test_utils.run_once(loop=loop)
This snippet shows the basic mechanics of asyncio.Future
. For simplicity, I did leave out some other functionality it has:
set_exception()
method, which will cause that exception to be raised inside the task when result()
is called.Notice that asyncio.Future
is not protected by any threading locks. This is one of the advantages of using an event loop instead of using threads. Since everything is running on the same thread, you don't get any surprise context switches. Every evaluation step is guaranteed to execute immediately after the previous evaulation step, unless the previous step is a yield
or yield from
. This drastically reduces the number of of ways in which your program to execute, and makes it much easier to reason about.