In [1]:
from __future__ import absolute_import
from __future__ import print_function
from tornado import gen
from flowz import app
from flowz.channels import *
from flowz.artifacts import *
In [2]:
from logging import config as logconf
def config_logging(level='DEBUG'):
logconf.dictConfig({
'version': 1,
'loggers': {},
'disable_existing_loggers': 0,
'root': {
'level': level,
'handlers': ['default_handler'],
},
'handlers': {
'default_handler': {
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout',
},
},
}
)
config_logging('WARN')
In [3]:
def print_chans(*chans, **kwargs):
# This is a bit more elaborate than before to resolve artifacts
mode = kwargs.get('mode', 'get')
func = kwargs.get('func', print)
app.Flo([chan.map(lambda y: getattr(y, mode)()).each_ready().map(func) for chan in chans]).run()
Another form of joining made possible by the util module is very powerful. Here is an example reusing the chan_div_2 and chan_div_3 from the previous chapter:
In [4]:
from flowz.util import merge_keyed_channels
In [5]:
chan_div_2 = IterChannel(KeyedArtifact(i, i) for i in range(1, 13) if i % 2 == 0)
chan_div_3 = IterChannel(KeyedArtifact(i, i*10) for i in range(1, 13) if i % 3 == 0)
merged = merge_keyed_channels(chan_div_2, chan_div_3)
print_chans(merged)
What did that do? If you think carefully about the values produced by the two channels, you will deduce that for each key, it selects the rightmost value with that key. In other words, the values from both channels make it into the final channel, but if they both provide values for a particular key, the rightmost channel wins. (Note, in this case, that 6 and 12 are not in the list.)
Why is that helpful? That pattern works very well for creating the Possible/Extant/All pattern that is one main raisons d' etre for the flowz framework in the first place. Here's how it works...
Suppose you have an expensive derivation function. For giggles and grins, let's say it takes 10 minutes to compute.
In [6]:
def expensive_deriver(num):
# 10 minutes pass...
return num * 100
chan = IterChannel(KeyedArtifact(i, DerivedArtifact(expensive_deriver, i)) for i in range(10))
print_chans(chan.tee())
That would have taken 100 minutes. If your process died 85 minutes in, you would be bummed to have to do it all again. So, you would like to have a way to write out the results and, in the case of a crash, pick up where you left off. That's where the pattern comes in.
The channel just defined represents all of the possible values, so let's call it that.
In [7]:
possible = chan
Suppose that we have already written out our data to durable storage. We can represent that with an array of previously written values (as though we had run for 85 minutes):
In [8]:
storage = {num: expensive_deriver(num) for num in range(8)}
print(storage)
Now, we need an ExtantArtifact that gets this data out of storage:
In [9]:
class ExampleExtantArtifact(ExtantArtifact):
def __init__(self, num):
super(ExampleExtantArtifact, self).__init__(self.get_me, name='ExampleExtantArtifact')
self.num = num
@gen.coroutine
def get_me(self):
raise gen.Return(storage[self.num])
Most durable storage mechanisms allow you to determine the keys of your stored items in sorted order, so lets do that and create an extant channel with that order:
In [10]:
keys = sorted(storage.keys())
print(keys)
In [11]:
extant = IterChannel(KeyedArtifact(i, ExampleExtantArtifact(i)) for i in sorted(storage.keys()))
print_chans(extant.tee())
Great! Now we can combine these two channels into our all channel, preferring the items in the extant channel:
In [12]:
all_ = merge_keyed_channels(possible.tee(), extant.tee())
print_chans(all_.tee())
OK. Something happened there, but it's not clear exactly what, since it looks like the unadulterated output of the possible channel. Let's turn logging back on, buid everything again (since all the artifacts have already been derived once, teeing won't be illustrative) and see what happens:
In [13]:
config_logging('INFO')
possible = IterChannel(KeyedArtifact(i, DerivedArtifact(expensive_deriver, i, name='expensive')) for i in range(10))
extant = IterChannel(KeyedArtifact(i, ExampleExtantArtifact(i)) for i in keys)
all_ = merge_keyed_channels(possible, extant)
print_chans(all_.tee())
config_logging('WARN')
Boom! Notice how the expensive_deriver() calls ("DerivedArtifact<expensive> running deriver.") are only called twice at the end. Our code did not have to consciously figure out how much had already been done and carefully make sure that we only call the deriver for the remaining ones. The lazy evaluation did it all.
There is yet one more performance improvement to make here, though. If we have already written out 8 of our expensively derived data sets, not only do we no longer need to derive and write them out, but we don't even need to read them in! flowz and the ExtantArtifact class allows to optimize things by ensuring each of the items in the channel, rather than getting them.
In [14]:
config_logging('INFO')
possible = IterChannel(KeyedArtifact(i, DerivedArtifact(expensive_deriver, i, name='expensive')) for i in range(10))
extant = IterChannel(KeyedArtifact(i, ExampleExtantArtifact(i)) for i in sorted(storage.keys()))
all_ = merge_keyed_channels(possible, extant)
print_chans(all_.tee(), mode='ensure', func=lambda a: a)
config_logging('WARN')
ensure on an ExtantArtifact is essentially a no-op just returning True, but it calls the get method on a DerivedArtifact. So we have done the minimal amount needed to get up to date:
All that remains now is that we haven't written this new data. Let's try that now.
In [15]:
# A function to write the data, to be passed to a transform() call
def data_writing_transform(key, value):
storage[key] = value
return value
In [16]:
# recreate the storage and turn on logging
storage = {num: expensive_deriver(num) for num in range(8)}
config_logging('INFO')
In [17]:
# Run as though we failed after 85 minutes and are picking up again
possible = IterChannel(KeyedArtifact(i, DerivedArtifact(expensive_deriver, i, name='expensive')).transform(data_writing_transform, i) for i in range(10))
extant = IterChannel(KeyedArtifact(i, ExampleExtantArtifact(i)) for i in sorted(storage.keys()))
all_ = merge_keyed_channels(possible, extant)
print_chans(all_.tee(), mode='ensure', func=lambda a: a)
That's odd. It shows four deviver calls. Notice that only two of them, however, have "<expensive>" in the log. It turns out the transform() uses a DerivedArtifact under the covers, too.
In [18]:
print(storage)
Yes! Our storage has been updated. Now, if we run yet again, nothing should be done.
In [19]:
possible = IterChannel(KeyedArtifact(i, DerivedArtifact(expensive_deriver, i, name='expensive')).transform(data_writing_transform, i) for i in range(10))
extant = IterChannel(KeyedArtifact(i, ExampleExtantArtifact(i)) for i in sorted(storage.keys()))
all = merge_keyed_channels(possible, extant)
print_chans(all.tee(), mode='ensure', func=lambda a: a)
QED
In [20]:
# recreate the storage to not mess up other parts of the notebook when run out of order, and turn off logging
storage = {num: expensive_deriver(num) for num in range(8)}
config_logging('WARN')