In [1]:
from __future__ import absolute_import
from __future__ import print_function

from tornado import gen
from flowz import app
from flowz.channels import *
from flowz.artifacts import *

In [2]:
from logging import config as logconf
def config_logging(level='DEBUG'):
    logconf.dictConfig({
            'version': 1,
            'loggers': {},
            'disable_existing_loggers': 0,
            'root': {
                'level': level,
                'handlers': ['default_handler'],
                },
            'handlers': {
                'default_handler': {
                    'class': 'logging.StreamHandler',
                    'stream': 'ext://sys.stdout',
                    },
                },
            }
    )
config_logging('DEBUG')

In [3]:
def print_chans(*chans):
    # This is a bit more elaborate than before to resolve artifacts
    app.Flo([chan.map(lambda y: getattr(y, 'get')()).each_ready().map(print) for chan in chans]).run()

Introduction to Artifacts

Background

If channels only used sources like lists and dictionaries already in memory, they would have little value. The true values comes when getting data from external sources or heavy computations that take time. In such an environment, accessing the data asynchronously and concurrently -- and even "out of order" in some cases -- can lead to nice performance benefits, and possibly more elegant code.

A significant boost in unlocking that asynchrony is artifacts. Artifacts are objects that know how to retrieve, compute, or transform their data, but don't necessarily do it right away. They delay getting the data until requested to do so, and then they use the tornado/futures infrastructure to get their data asynchronously. Once ready, their data is available to others.

ExtantArtifact

An ExtantArtifact is an artifact that represents data that is known to exist, and it uses a tornado coroutine to get its data. It is particularly suitable for fetching data via existing asynchronous mechanisms, like httpclient.AsyncHTTPClient.


In [4]:
# An ExtantArtifact that will be used here and elsewhere in the guide
class GuideExtantArtifact(ExtantArtifact):
    def __init__(self, num):
        super(GuideExtantArtifact, self).__init__(self.get_me, name='GuideExtantArtifact')
        self.num = num

    @gen.coroutine
    def get_me(self):
        # O, pardon! since a crooked figure may attest in little place a million;
        # On your imaginary forces work... Piece out our imperfections with your thoughts;
        # Think when we talk of horses, that you see them printing their proud hoofs in the receiving earth;
        # For 'tis your thoughts that now must deck our kings...
        # (in other words, pretend this got some impressive data asynchronously)
        raise gen.Return((self.num, self.num * 100))

In [5]:
chan = IterChannel([GuideExtantArtifact(i) for i in range(3)])
print_chans(chan)


GuideExtantArtifact<GuideExtantArtifact> starting get
GuideExtantArtifact<GuideExtantArtifact> retrieved.
GuideExtantArtifact<GuideExtantArtifact> starting get
GuideExtantArtifact<GuideExtantArtifact> retrieved.
(0, 0)
GuideExtantArtifact<GuideExtantArtifact> starting get
GuideExtantArtifact<GuideExtantArtifact> retrieved.
(1, 100)
(2, 200)

Surprise! Artifacts have logging built into them. In the case of ExtantArtifact, it logs before calling the getter (which then yields) and after it has completed the retrieval of the data. (These log messages are a blend of DEBUG and INFO level, so the detail will vary at times in this guide.)

DerivedArtifact

A DerivedArtifact is an artifact that uses a normal synchronous function as a deriver of its data. That function will be passed any number of "sources", and flowz will make sure that all of the sources have been fully resolved before being passed as parameters. For instance, if the sources are artifacts, they will be resolved to their values.


In [6]:
def lame_deriver(num):
    return (num, -10 * num)

chan = IterChannel(DerivedArtifact(lame_deriver, i) for i in range(3))
print_chans(chan)


DerivedArtifact starting get
DerivedArtifact waiting on sources.
DerivedArtifact running deriver.
DerivedArtifact ready.
DerivedArtifact starting get
DerivedArtifact waiting on sources.
DerivedArtifact running deriver.
DerivedArtifact ready.
(0, 0)
DerivedArtifact starting get
DerivedArtifact waiting on sources.
DerivedArtifact running deriver.
DerivedArtifact ready.
(1, -10)
(2, -20)

Here again we see logging, but a bit more. Before the deriver is called, flowz first makes sure that each one of the sources is resolved. Then the deriver is called -- synchronously -- and the results are ready.

Note above that there are two "ready" messages before the first value is actually printed. That, again, is an indicator of the asychronous processing of the channels.

In practice, DerivedArtifacts are used to gather and transform data that began with ExtantArtifacts.


In [7]:
def not_as_lame_deriver(val):
    num, extant_val = val
    return (num, extant_val / -10)

chan = IterChannel(GuideExtantArtifact(i) for i in range(3)).map(lambda a: DerivedArtifact(not_as_lame_deriver, a))
print_chans(chan)


DerivedArtifact starting get
DerivedArtifact waiting on sources.
GuideExtantArtifact<GuideExtantArtifact> starting get
GuideExtantArtifact<GuideExtantArtifact> retrieved.
DerivedArtifact running deriver.
DerivedArtifact ready.
DerivedArtifact starting get
DerivedArtifact waiting on sources.
GuideExtantArtifact<GuideExtantArtifact> starting get
GuideExtantArtifact<GuideExtantArtifact> retrieved.
DerivedArtifact running deriver.
DerivedArtifact ready.
(0, 0)
DerivedArtifact starting get
DerivedArtifact waiting on sources.
GuideExtantArtifact<GuideExtantArtifact> starting get
GuideExtantArtifact<GuideExtantArtifact> retrieved.
DerivedArtifact running deriver.
DerivedArtifact ready.
(1, -10)
(2, -20)

Now you can see that the firing of each DerivedArtifact caused its source to be resolved, which meant that its wrapped GuideExtantArtifact retrieved its value. Only after that was the result (a tuple) passed into the deriver.

ThreadedDerivedArtifact

A ThreadedDerivedArtifact is just like a DerivedArtifact, but it is passed a concurrent.futures.ThreadPoolExecutor on which it will run.

  1. Some things are IO-bound and thus quite amenable to the async IO pattern around which tornado is built. But they aren't implemented in terms of async IO. In such cases, you can get good results by pushing the blocking IO onto a thread pool executor, which this enables. The individual threads can block on the synchronous IO, but the main IOLoop continues on its merry way all the while. So if you're dealing with synchronous IO-bound clients, put 'em in here. (NOTE: boto and boto3 are prime examples of this.)

  2. Some routines are pretty hoggy in terms of computation, and they'll starve the IOLoop unless you take steps to offload them. In such cases, getting them onto a thread pool executor (and a shallow pool, at that) can be helpful.


In [8]:
from concurrent import futures
executor = futures.ThreadPoolExecutor(1)
chan = IterChannel(ThreadedDerivedArtifact(executor, lame_deriver, i) for i in range(3))
print_chans(chan)


ThreadedDerivedArtifact created (None).
ThreadedDerivedArtifact created (None).
ThreadedDerivedArtifact created (None).
ThreadedDerivedArtifact starting get
ThreadedDerivedArtifact waiting on sources.
ThreadedDerivedArtifact running deriver on executor.
ThreadedDerivedArtifact starting get
ThreadedDerivedArtifact waiting on sources.
ThreadedDerivedArtifact running deriver on executor.
ThreadedDerivedArtifact ready.
ThreadedDerivedArtifact ready.
(0, 0)
ThreadedDerivedArtifact starting get
ThreadedDerivedArtifact waiting on sources.
(1, -10)ThreadedDerivedArtifact running deriver on executor.

ThreadedDerivedArtifact ready.
(2, -20)

In [9]:
# Okay, no more logging!
config_logging('WARN')

TransformedArtifact

A TransformedArtifact wraps another artifact and transforms its value. Its not that different from a DerivedArtifact. One advantage that it has -- inherited from its superclass WrappedArtifact, which is rarely used directly -- is that indexing and attribute calls are passed on through to the underlying artifact.


In [10]:
chan = IterChannel(TransformedArtifact(GuideExtantArtifact(i), transformer=not_as_lame_deriver) for i in range(3))
print_chans(chan)


(0, 0)
(1, -10)
(2, -20)