In [1]:
from __future__ import absolute_import
from __future__ import print_function
from tornado import gen
from flowz import app
from flowz.channels import IterChannel
from flowz.channels import management as mgmt
from flowz.artifacts import ExtantArtifact, DerivedArtifact, KeyedArtifact
from flowz.util import merge_keyed_channels
In [2]:
def print_chans(*chans, **kwargs):
# This is a bit more elaborate than before to resolve artifacts
mode = kwargs.get('mode', 'ensure')
func = kwargs.get('func', print)
app.Flo([chan.map(lambda y: getattr(y, mode)()).each_ready().map(func) for chan in chans]).run()
When constructing a large graph of channels with interdependencies, it can be difficult to keep track of whether you have done a proper number of tee() calls. Because the danger of forgetting to tee can be so grave, one can tend to over-tee, leading to memory leaks.
Also, there are patterns (like the Possible/Extant/All pattern) that are useful when setting up graphs, but, in the clutter of linear code creating channel after channel, it can become unclear when such patterns are in use or whether they are done properly.
Finally, in the spirit of the rest of flowz, it would be nice if channel construction were done lazily, strictly on an as-needed basis.
The ChannelManager aims to resolve all of those issues. Its use is, as usual, best demonstrated before describing.
We'll start with some code borrowed from the Possible/Extant/All pattern example in chapter 4:
In [3]:
def expensive_deriver(num):
# 10 minutes pass...
return num * 100
# Our fake durable storage holding the first 8 derived elements
storage = {num: expensive_deriver(num) for num in range(8)}
# The ExtantArtifact accessing that data
class ExampleExtantArtifact(ExtantArtifact):
def __init__(self, num):
super(ExampleExtantArtifact, self).__init__(self.get_me, name='ExampleExtantArtifact')
self.num = num
@gen.coroutine
def get_me(self):
raise gen.Return(storage[self.num])
Now the ChannelManager code:
In [4]:
class GuideChannelManager(object):
def __init__(self):
# sets the 'channel_manager' property, which must exist on the class
self.channel_manager = mgmt.ChannelManager()
@mgmt.channelproperty
def extant(self):
print('Creating extant channel')
return IterChannel(KeyedArtifact(i, ExampleExtantArtifact(i)) for i in sorted(storage.keys()))
@mgmt.channelproperty
def possible(self):
print('Creating possible channel')
return IterChannel(KeyedArtifact(i, DerivedArtifact(expensive_deriver, i)) for i in range(10))
@mgmt.channelproperty
def all(self):
print('Creating all channel')
return merge_keyed_channels(self.possible, self.extant)
Note that this class is not actually a subclass of ChannelManager, but it makes use of it in two ways:
channel_manager property on itself that is a private instance of a ChannelManager@mgmt.channelproperty decorator above three methods.Now using this class is straightforward...
In [5]:
print_chans(GuideChannelManager().all, mode='get')
Nice. It worked. Here are the steps that happened:
GuideChannelManager object was instantiated, which did little other than instantiate a ChannelManager.all property was asked of the GuideChannelManager.all() method, which referenced the possible and extant properties.possible() and extant() methods.IterChannel and returned it.tee() of their corresponding channels.all() method where the two channels were used as input in the creation of a new channel, which is returned.all() stored this channel and would tee() it on any subsequent requests for the all property.all property was passed to print_chans() and drained.Very notably, no tees were performed on any of these channels. In this configuration, they were all needed only once, so that's what this pattern did. If hand-coded, the final channel would likely have worked with tees of the first two channels in an abundance of caution, possibly causing leaks of the original channels.
In [6]:
# recreate the storage to not mess up other parts of the notebook when run out of order
storage = {num: expensive_deriver(num) for num in range(8)}
flowz is very likely to be principally used in scripts run as periodic (e.g., nightly) processes to synthesize and analyze data coming in from external sources. In such scripts, in can be handy to assign well-known names to some of the stages and choose the target via a script parameter. Here is a possible pattern for doing that.
In [7]:
targets = dict()
def configure_channels():
mgr = GuideChannelManager()
targets['possible'] = lambda: mgr.possible
targets['extant'] = lambda: mgr.extant
targets['all'] = lambda: mgr.all
targets['default'] = targets['all']
configure_channels()
def run_target(name='default'):
print_chans(targets[name](), mode='get')
With that in place, the main() processing of a script could capture the command-line arguments and end up calling a target like:
In [8]:
run_target('extant')
Or...
In [9]:
# Calling again to act like a fresh running of the script
configure_channels()
In [10]:
run_target()
In [11]:
# recreate the storage to not mess up other parts of the notebook when run out of order
storage = {num: expensive_deriver(num) for num in range(8)}