Interleaving computations

In the previous chapter, we stated that one use for a value-returning generator is to have a long running computation be able to both return a final value, but also suspend itself in the middle of its computation in order to give other computations a chance to run.

Interleaving computations can also be accomplished by spawning threads. But performing concurrent tasks in a single thread has a number of advantages:

  • Spawning threads usually has a higher overhead.
  • Spawning threads requires more coordination with the operating system.
  • The Python Global Interpretter Lock (GIL) prevents multiple threads from running pure Python code at the same time. True parallelism can only be achieved while threads are blocked on I/O. For multiple CPU bound tasks, threads offer no more parallelism than an event loop does.
  • Well written event loops (such as asyncio) are able to achieve parallelism when tasks yield due to an I/O operation. So even with I/O bound tasks, event loops can perform just as well (or better than) threads.
  • Generators inside an event loop can control when context switches will occur (at every yield or yield from, but no where else), and developers can easily see this when reading the code.
    • This creates fewer opportunities for "thread"-unsafe operations to accidentally occur.

In a single thread, multiple tasks can be coordinated using an event loop. An event loop constantly loops (usually forever), and runs a single step of a single task for each run through the loop, as long as there are any tasks that are ready to run.

Simplest event loop - running a single task to completion and returning its result

The simplest, but least useful, type of "event loop" (if it can even be called that) would just loop over a single generator, running it until completion and then returning its result. This function turns a suspendable function into a normal, blocking function.


In [1]:
def run_until_complete(generator):
   try:
       while True:
           next(generator)
   except StopIteration as exc:
       return exc.value
    
def generator():
    for _ in range(1000):
        yield 'suspend'
    return 'result_of_generator'

g = generator()
print(g)
for _ in range(4):
    print(next(g))
print('etc.')
print(run_until_complete(g))


<generator object generator at 0x10aec1a98>
suspend
suspend
suspend
suspend
etc.
result_of_generator

Interleaving computations with a rudimentary event loop

In this example, we define a generator-based factorial() function. It does the computation iteratively, without any recursion. Each loop does one multiplication computation, before yielding.

We also define a step() procedure (which simply calls __next__() on its input), as well as a rudimentary event loop run(). This event loop accepts a list of arguments, which are tuples of (generator, callback to call with the final result of the generator). It also must be configured with a mode, either interleaved or non-interleaved.

The event loop runs through all of the generators. When a generator is completed, it gathers the return value, and passes it to its registered callback. When all generators have completed, run() also completes.

When run in non-interleaved mode, the first generator is run uninterrupted until completion, then its callback is called. Then the same is done for the second generator, and so on.

When run in interleaved mode, each generator gets run for exactly one step, before being pushed to the back of the queue and then running a step on the next generator. When a generator is completed, its callback is called immediately, before the next generator is stepped.


In [2]:
from collections import OrderedDict

def factorial(N):
    if not isinstance(N, int):
        raise TypeError("factorial requires an int")
    if N < 0:
        raise ValueError("factorial requires a non-negative int")
    factorial = _factorial(N)
    factorial.__name__ = f"factorial({N})"
    return factorial 

def _factorial(N):
    if 0 <= N <= 1:
        return 1
    if N == 2:
        return 2
    product = N * (N - 1)
    n = N - 2
    while n > 1:
        print("partial product for", f"{N}!", "is", product)
        yield product
        product *= n
        n -= 1
    return product

def step(generator):
    next(generator)

def run(*args, interleave):
    d = OrderedDict(args)
    rounds = 0
    mode = "interleaved" if interleave else "non-interleaved"
    print("Starting", mode, "computations.")
    while d:
        rounds += 1
        generator, on_complete = d.popitem(last=False)
        try:
            step(generator)
        except StopIteration as exc:
            print("Completed", generator.__name__, "after", rounds, "rounds")
            on_complete(generator, exc.value)
        else:
            d[generator] = on_complete
            d.move_to_end(generator, last=interleave)

def print_result(factorial_generator, result):
    print(factorial_generator.__name__, '==', result)
            
run((factorial(12), print_result), (factorial(8), print_result), (factorial(4), print_result), interleave=False)          
print()
run((factorial(12), print_result), (factorial(8), print_result), (factorial(4), print_result), interleave=True)


Starting non-interleaved computations.
partial product for 12! is 132
partial product for 12! is 1320
partial product for 12! is 11880
partial product for 12! is 95040
partial product for 12! is 665280
partial product for 12! is 3991680
partial product for 12! is 19958400
partial product for 12! is 79833600
partial product for 12! is 239500800
Completed factorial(12) after 10 rounds
factorial(12) == 479001600
partial product for 8! is 56
partial product for 8! is 336
partial product for 8! is 1680
partial product for 8! is 6720
partial product for 8! is 20160
Completed factorial(8) after 16 rounds
factorial(8) == 40320
partial product for 4! is 12
Completed factorial(4) after 18 rounds
factorial(4) == 24

Starting interleaved computations.
partial product for 12! is 132
partial product for 8! is 56
partial product for 4! is 12
partial product for 12! is 1320
partial product for 8! is 336
Completed factorial(4) after 6 rounds
factorial(4) == 24
partial product for 12! is 11880
partial product for 8! is 1680
partial product for 12! is 95040
partial product for 8! is 6720
partial product for 12! is 665280
partial product for 8! is 20160
partial product for 12! is 3991680
Completed factorial(8) after 14 rounds
factorial(8) == 40320
partial product for 12! is 19958400
partial product for 12! is 79833600
partial product for 12! is 239500800
Completed factorial(12) after 18 rounds
factorial(12) == 479001600

Both interleaved and non-interleaved have the same total runtimes. However, the program may appear to be more responsive in one mode over the other, depending on the scheduled generators. In non-interleaved mode, the number of rounds to complete a particular generator is T + R, where R is the number of rounds to complete that generator in isolation, and T is the total number of rounds to complete all of the generators that were scheduled before it. Whereas, in interleaved mode, the number of rounds is, at most, (N * R) + R, where N is the number of other generators in the event loop (not including itself). Let's consider how these behave under various conditions:

  • One task: No difference.
  • One short task followed by one long task: No matter what, the long task will complete at the same time. But the short task will take slightly longer in interleaved mode.
  • One long task followed by one short task: The long task will finish slightly faster in non-interleaved mode, but the short task will finish much faster in interleaved mode.
  • Many tasks of equal length: In non-interleaved mode, one generator will complete every R rounds. In interleaved mode, a long time will pass without completing any generators, but towards the end many generators will complete in rapid succession.

Interleaved mode isn't guaranteed to be better, but it is probably expected to perform better on average. In this mode, if there are only a few generators, then short tasks will always complete fast. Whereas the more generators there are in the run loop, the slower every generator will run, but each generator is penalized proportionally to its size. Non-interleaved mode can sometimes perform better, but since it penalizes based on position in the queue and based on the sizes of previous generators, it can cause massive and unfair slowdowns.

In the example above, switching from non-interleaved to interleaved mode has the following effects:

  • factorial(12) goes from completing after 10 rounds, to completing after 18
  • factorial(8) goes from completing after 16 rounds, to completing after 14
  • factorial(4) goes from completing after 18 rounds, to completing after 6

This isn't a realistic example of a use case for an event loop, but it does serve to demonstrate the very basics, and demonstrate the value that be gotten by doing concurrent non-blocking operations, instead of sequential blocking operations.

Tasks and callbacks

In the example above, notice how the run() loop required a callback for every generator. Because the loop runs an arbitrary number of generators and could potentially run forever, it can't return the results that it computes. Thus, if the final values are meant to be used in some computation, then the computation must be specified in a callback, and the event loop must call the callback with the result.

Our first example didn't quite use a realistic callback system:

  • It only allowed exactly one callback, instead of an arbitrary number.
  • The callback was run immediately, instead of being called later.
  • The callback had to be specified at the beginning of the run, instead of at any time.
  • The callback was assumed to be a synchronous computation, when it could also want to schedule new suspendable tasks with the event loop.

We've also been informally using the word "task" synonymously with generator in a few places. Let's be clearer about that. A task is some suspendable computation that can trigger callbacks when it is done. It just so happens than many event loop implementations choose to use generators to represent a task.

Our example also wasn't realistic with tasks, because it required the set of tasks to be fixed at call time.

Let's try a new example which fixes these issues.


In [3]:
from collections import deque
import inspect


class EventLoop(object):
    def __init__(self, *callbacks):
        callbacks = deque(callbacks)
        super().__init__()
        self.callbacks = callbacks

    def run_until_complete(self):
        print("Starting", self)
        while self.callbacks:
            try:
                callback, args = self.callbacks.popleft()
            except IndexError:
                break
            print("Running callback", callback, "with args", args)
            callback(*args)
        print("No more callbacks for", self, "so shutting down the event loop")
    
    def call_soon(self, callback, *args):
        print("Scheduling callback", callback, "with args", args)
        self.callbacks.append((callback, args))
        
    def schedule_tasks(self, *tasks):
        for task in tasks:
            print("Scheduling task", task)
            self.callbacks.append((task.step, ()))
    
class Task(object):
    def __init__(self, generator, *, loop, callbacks=None):
        callbacks = callbacks or []
        super().__init__()
        self.generator = generator
        self.loop = loop
        self.callbacks = callbacks
                
    def step(self):
        print("Stepping", self)
        try:
            self._step_generator()
        except StopIteration as exc:
            print(self, "finished with value", exc.value)
            for callback in self.callbacks:
                self.loop.call_soon(callback, exc.value)
        else:
            self.loop.schedule_tasks(self)
        
    def _step_generator(self):
        next(self.generator)
        
    def __repr__(self):
        return f"<{self.__class__.__name__}: {self.generator.__name__!r}>"

def factorial(n, *, loop, product=1, N=None):
    if N is None:
        N = n
    factorial = _factorial(n, loop=loop, product=product, N=N)
    factorial.__name__ = f"factorial(n={n!r}, product={product!r}, {N!r})"
    return factorial

def _factorial(n, *, loop, product, N):
    if 0 <= n <= 1:
        return product
    if n == 2:
        return 2 * product
    product *= n
    n -= 1
    yield product
    product *= n
    n -= 1
    return (n, dict(loop=loop, product=product, N=N))

def factorial_callback(result):
    if isinstance(result, tuple):
        n, kwargs = result
        loop.schedule_tasks(Task(factorial(n, **kwargs), loop=kwargs['loop'], callbacks=[factorial_callback]))
    else:
        print(result)
    
loop = EventLoop()
loop.schedule_tasks(
    Task(factorial(4, loop=loop), loop=loop, callbacks=[factorial_callback]),
    Task(factorial(3, loop=loop), loop=loop, callbacks=[factorial_callback]),
)
loop.run_until_complete()


Scheduling task <Task: 'factorial(n=4, product=1, 4)'>
Scheduling task <Task: 'factorial(n=3, product=1, 3)'>
Starting <__main__.EventLoop object at 0x10aef7e10>
Running callback <bound method Task.step of <Task: 'factorial(n=4, product=1, 4)'>> with args ()
Stepping <Task: 'factorial(n=4, product=1, 4)'>
Scheduling task <Task: 'factorial(n=4, product=1, 4)'>
Running callback <bound method Task.step of <Task: 'factorial(n=3, product=1, 3)'>> with args ()
Stepping <Task: 'factorial(n=3, product=1, 3)'>
Scheduling task <Task: 'factorial(n=3, product=1, 3)'>
Running callback <bound method Task.step of <Task: 'factorial(n=4, product=1, 4)'>> with args ()
Stepping <Task: 'factorial(n=4, product=1, 4)'>
<Task: 'factorial(n=4, product=1, 4)'> finished with value (2, {'loop': <__main__.EventLoop object at 0x10aef7e10>, 'product': 12, 'N': 4})
Scheduling callback <function factorial_callback at 0x10aedb1e0> with args ((2, {'loop': <__main__.EventLoop object at 0x10aef7e10>, 'product': 12, 'N': 4}),)
Running callback <bound method Task.step of <Task: 'factorial(n=3, product=1, 3)'>> with args ()
Stepping <Task: 'factorial(n=3, product=1, 3)'>
<Task: 'factorial(n=3, product=1, 3)'> finished with value (1, {'loop': <__main__.EventLoop object at 0x10aef7e10>, 'product': 6, 'N': 3})
Scheduling callback <function factorial_callback at 0x10aedb1e0> with args ((1, {'loop': <__main__.EventLoop object at 0x10aef7e10>, 'product': 6, 'N': 3}),)
Running callback <function factorial_callback at 0x10aedb1e0> with args ((2, {'loop': <__main__.EventLoop object at 0x10aef7e10>, 'product': 12, 'N': 4}),)
Scheduling task <Task: 'factorial(n=2, product=12, 4)'>
Running callback <function factorial_callback at 0x10aedb1e0> with args ((1, {'loop': <__main__.EventLoop object at 0x10aef7e10>, 'product': 6, 'N': 3}),)
Scheduling task <Task: 'factorial(n=1, product=6, 3)'>
Running callback <bound method Task.step of <Task: 'factorial(n=2, product=12, 4)'>> with args ()
Stepping <Task: 'factorial(n=2, product=12, 4)'>
<Task: 'factorial(n=2, product=12, 4)'> finished with value 24
Scheduling callback <function factorial_callback at 0x10aedb1e0> with args (24,)
Running callback <bound method Task.step of <Task: 'factorial(n=1, product=6, 3)'>> with args ()
Stepping <Task: 'factorial(n=1, product=6, 3)'>
<Task: 'factorial(n=1, product=6, 3)'> finished with value 6
Scheduling callback <function factorial_callback at 0x10aedb1e0> with args (6,)
Running callback <function factorial_callback at 0x10aedb1e0> with args (24,)
24
Running callback <function factorial_callback at 0x10aedb1e0> with args (6,)
6
No more callbacks for <__main__.EventLoop object at 0x10aef7e10> so shutting down the event loop

This is a bit closer to a realistic event loop. It doesn't have most of the features of asyncio or similar event loops, but it does cover the very basic requirements.

We can see more clearly now that the event loop requires committment. Unless you're using run_until_complete() to block on a specific task and then exit the event loop, the event loop is most likely going to control execution for the rest of the program. Thus, all program logic needs to occur in generators/tasks and callbacks. These tasks will perform units of work, and may spawn other tasks and callbacks.

Furthermore, in order for the event loop to run well, you want each unit of work to end with a yield. Usually a task would yield after initiating some non-blocking I/O operation (more on this later), but can also yield in the middle of a long computation, or when waiting for some other task to complete (which we'll see in the next chapter). Any task which hogs the event loop will lower the perceived performance of all other tasks. Thus, a call like requests.get(URL) is inappropriate in an event loop, because it does blocking I/O (moreover, it might make multiple HTTP calls, if it has to deal with retries or redirects). All I/O ideally must use event loop-aware variants.

License

License: Apache License, Version 2.0
Jordan Moldow, 2017

Copyright 2017 Jordan Moldow

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.