In the previous chapter, we stated that one use for a value-returning generator is to have a long running computation be able to both return a final value, but also suspend itself in the middle of its computation in order to give other computations a chance to run.
Interleaving computations can also be accomplished by spawning threads. But performing concurrent tasks in a single thread has a number of advantages:
asyncio
) are able to achieve parallelism when tasks yield due to an I/O operation. So even with I/O bound tasks, event loops can perform just as well (or better than) threads.yield
or yield from
, but no where else), and developers can easily see this when reading the code.In a single thread, multiple tasks can be coordinated using an event loop. An event loop constantly loops (usually forever), and runs a single step of a single task for each run through the loop, as long as there are any tasks that are ready to run.
The simplest, but least useful, type of "event loop" (if it can even be called that) would just loop over a single generator, running it until completion and then returning its result. This function turns a suspendable function into a normal, blocking function.
In [1]:
def run_until_complete(generator):
try:
while True:
next(generator)
except StopIteration as exc:
return exc.value
def generator():
for _ in range(1000):
yield 'suspend'
return 'result_of_generator'
g = generator()
print(g)
for _ in range(4):
print(next(g))
print('etc.')
print(run_until_complete(g))
In this example, we define a generator-based factorial()
function. It does the computation iteratively, without any recursion. Each loop does one multiplication computation, before yielding.
We also define a step()
procedure (which simply calls __next__()
on its input), as well as a rudimentary event loop run()
. This event loop accepts a list of arguments, which are tuples of (generator, callback to call with the final result of the generator). It also must be configured with a mode, either interleaved or non-interleaved.
The event loop runs through all of the generators. When a generator is completed, it gathers the return value, and passes it to its registered callback. When all generators have completed, run()
also completes.
When run in non-interleaved mode, the first generator is run uninterrupted until completion, then its callback is called. Then the same is done for the second generator, and so on.
When run in interleaved mode, each generator gets run for exactly one step, before being pushed to the back of the queue and then running a step on the next generator. When a generator is completed, its callback is called immediately, before the next generator is stepped.
In [2]:
from collections import OrderedDict
def factorial(N):
if not isinstance(N, int):
raise TypeError("factorial requires an int")
if N < 0:
raise ValueError("factorial requires a non-negative int")
factorial = _factorial(N)
factorial.__name__ = f"factorial({N})"
return factorial
def _factorial(N):
if 0 <= N <= 1:
return 1
if N == 2:
return 2
product = N * (N - 1)
n = N - 2
while n > 1:
print("partial product for", f"{N}!", "is", product)
yield product
product *= n
n -= 1
return product
def step(generator):
next(generator)
def run(*args, interleave):
d = OrderedDict(args)
rounds = 0
mode = "interleaved" if interleave else "non-interleaved"
print("Starting", mode, "computations.")
while d:
rounds += 1
generator, on_complete = d.popitem(last=False)
try:
step(generator)
except StopIteration as exc:
print("Completed", generator.__name__, "after", rounds, "rounds")
on_complete(generator, exc.value)
else:
d[generator] = on_complete
d.move_to_end(generator, last=interleave)
def print_result(factorial_generator, result):
print(factorial_generator.__name__, '==', result)
run((factorial(12), print_result), (factorial(8), print_result), (factorial(4), print_result), interleave=False)
print()
run((factorial(12), print_result), (factorial(8), print_result), (factorial(4), print_result), interleave=True)
Both interleaved and non-interleaved have the same total runtimes. However, the program may appear to be more responsive in one mode over the other, depending on the scheduled generators. In non-interleaved mode, the number of rounds to complete a particular generator is T + R, where R is the number of rounds to complete that generator in isolation, and T is the total number of rounds to complete all of the generators that were scheduled before it. Whereas, in interleaved mode, the number of rounds is, at most, (N * R) + R, where N is the number of other generators in the event loop (not including itself). Let's consider how these behave under various conditions:
Interleaved mode isn't guaranteed to be better, but it is probably expected to perform better on average. In this mode, if there are only a few generators, then short tasks will always complete fast. Whereas the more generators there are in the run loop, the slower every generator will run, but each generator is penalized proportionally to its size. Non-interleaved mode can sometimes perform better, but since it penalizes based on position in the queue and based on the sizes of previous generators, it can cause massive and unfair slowdowns.
In the example above, switching from non-interleaved to interleaved mode has the following effects:
factorial(12)
goes from completing after 10 rounds, to completing after 18factorial(8)
goes from completing after 16 rounds, to completing after 14factorial(4)
goes from completing after 18 rounds, to completing after 6This isn't a realistic example of a use case for an event loop, but it does serve to demonstrate the very basics, and demonstrate the value that be gotten by doing concurrent non-blocking operations, instead of sequential blocking operations.
In the example above, notice how the run()
loop required a callback for every generator. Because the loop runs an arbitrary number of generators and could potentially run forever, it can't return the results that it computes. Thus, if the final values are meant to be used in some computation, then the computation must be specified in a callback, and the event loop must call the callback with the result.
Our first example didn't quite use a realistic callback system:
We've also been informally using the word "task" synonymously with generator in a few places. Let's be clearer about that. A task is some suspendable computation that can trigger callbacks when it is done. It just so happens than many event loop implementations choose to use generators to represent a task.
Our example also wasn't realistic with tasks, because it required the set of tasks to be fixed at call time.
Let's try a new example which fixes these issues.
In [3]:
from collections import deque
import inspect
class EventLoop(object):
def __init__(self, *callbacks):
callbacks = deque(callbacks)
super().__init__()
self.callbacks = callbacks
def run_until_complete(self):
print("Starting", self)
while self.callbacks:
try:
callback, args = self.callbacks.popleft()
except IndexError:
break
print("Running callback", callback, "with args", args)
callback(*args)
print("No more callbacks for", self, "so shutting down the event loop")
def call_soon(self, callback, *args):
print("Scheduling callback", callback, "with args", args)
self.callbacks.append((callback, args))
def schedule_tasks(self, *tasks):
for task in tasks:
print("Scheduling task", task)
self.callbacks.append((task.step, ()))
class Task(object):
def __init__(self, generator, *, loop, callbacks=None):
callbacks = callbacks or []
super().__init__()
self.generator = generator
self.loop = loop
self.callbacks = callbacks
def step(self):
print("Stepping", self)
try:
self._step_generator()
except StopIteration as exc:
print(self, "finished with value", exc.value)
for callback in self.callbacks:
self.loop.call_soon(callback, exc.value)
else:
self.loop.schedule_tasks(self)
def _step_generator(self):
next(self.generator)
def __repr__(self):
return f"<{self.__class__.__name__}: {self.generator.__name__!r}>"
def factorial(n, *, loop, product=1, N=None):
if N is None:
N = n
factorial = _factorial(n, loop=loop, product=product, N=N)
factorial.__name__ = f"factorial(n={n!r}, product={product!r}, {N!r})"
return factorial
def _factorial(n, *, loop, product, N):
if 0 <= n <= 1:
return product
if n == 2:
return 2 * product
product *= n
n -= 1
yield product
product *= n
n -= 1
return (n, dict(loop=loop, product=product, N=N))
def factorial_callback(result):
if isinstance(result, tuple):
n, kwargs = result
loop.schedule_tasks(Task(factorial(n, **kwargs), loop=kwargs['loop'], callbacks=[factorial_callback]))
else:
print(result)
loop = EventLoop()
loop.schedule_tasks(
Task(factorial(4, loop=loop), loop=loop, callbacks=[factorial_callback]),
Task(factorial(3, loop=loop), loop=loop, callbacks=[factorial_callback]),
)
loop.run_until_complete()
This is a bit closer to a realistic event loop. It doesn't have most of the features of asyncio
or similar event loops, but it does cover the very basic requirements.
We can see more clearly now that the event loop requires committment. Unless you're using run_until_complete()
to block on a specific task and then exit the event loop, the event loop is most likely going to control execution for the rest of the program. Thus, all program logic needs to occur in generators/tasks and callbacks. These tasks will perform units of work, and may spawn other tasks and callbacks.
Furthermore, in order for the event loop to run well, you want each unit of work to end with a yield
. Usually a task would yield
after initiating some non-blocking I/O operation (more on this later), but can also yield
in the middle of a long computation, or when waiting for some other task to complete (which we'll see in the next chapter). Any task which hogs the event loop will lower the perceived performance of all other tasks. Thus, a call like requests.get(URL)
is inappropriate in an event loop, because it does blocking I/O (moreover, it might make multiple HTTP calls, if it has to deal with retries or redirects). All I/O ideally must use event loop-aware variants.
License: Apache License, Version 2.0
Jordan Moldow, 2017
Copyright 2017 Jordan Moldow Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.