In [1]:
from __future__ import absolute_import
from __future__ import print_function
import datetime
import random
from tornado import gen
from flowz import app
from flowz.channels import *
from flowz.artifacts import *
In [2]:
def print_chans(*chans):
app.Flo([chan.map(print) for chan in chans]).run()
Suppose you have a function that calculates some value for a given index, which we will think of as "days from the beginning of the year".
In [3]:
random.seed(1)
chan = IterChannel((i, random.randint(100, 200)) for i in range(10))
print_chans(chan.tee())
On any given day, you may want to know not just the value on that day, but all of the historical values as well. And it would be lovely to be able to get that in one data structure, especially if stored in cloud storage, rather than having to iterate over a channel each time.
flowz provides an incremental assembly facility that makes this relatively easy to do.
In [4]:
from flowz.util import incremental_assembly, NO_VALUE
# NO_VALUE is a special value defined for incremental_assembly() that indicates the start of assembly
In [5]:
def prepend_assembler(new, old):
"""
A simple assembler that prepends new data at the beginning of the tuple of old data.
"""
if old is NO_VALUE:
return (new,)
else:
return (new,) + old
dest = IterChannel([])
out = incremental_assembly(chan.tee(), dest.tee(), prepend_assembler)
print_chans(out)
That is an admittedly simple example of incremental assembly. In practice, the indices are likely to be dates, and the data might be pandas series or dataframes that get concatenated together. Nonetheless, the assemblers remain easy to write, and the infrastructure does a reliable job of passing on to the assembler the prior assembled data.