Finite Sequences


In [1]:
import json

In [2]:
data = ['{"name": "Alice", "value": 1}',
        '{"name": "Bob", "value": 2}',
        '{"name": "Alice", "value": 3}',
        '{"name": "Alice", "value": 4}',
        '{"name": "Charlie", "value": 5}',
        '{"name": "Bob", "value": 6}',
        '{"name": "Alice", "value": 7}']

In [3]:
seq = list(map(json.loads, data))
seq


Out[3]:
[{'name': 'Alice', 'value': 1},
 {'name': 'Bob', 'value': 2},
 {'name': 'Alice', 'value': 3},
 {'name': 'Alice', 'value': 4},
 {'name': 'Charlie', 'value': 5},
 {'name': 'Bob', 'value': 6},
 {'name': 'Alice', 'value': 7}]

In [4]:
import toolz
seq = list(toolz.pluck('value', seq))
seq


Out[4]:
[1, 2, 3, 4, 5, 6, 7]

In [5]:
sum(seq)


Out[5]:
28

Infinite Sequences


In [6]:
def infinite_data():
    for x in data:
        yield x
        
    # Here we stop, but we could keep going forever...
    raise StopIteration

In [7]:
from operator import add
seq = infinite_data()
seq = map(json.loads, seq)
seq = toolz.pluck('value', seq)
seq = toolz.accumulate(add, seq)

In [8]:
for item in seq:
    print(item)


1
3
6
10
15
21
28

Branching Sequences

Sometimes we want to do multiple things with an infinite sequence. This is where the Python iterator abstraction starts to feel awkward.


In [9]:
import itertools
import logging
from collections import deque

seq = infinite_data()
seq = map(json.loads, data)

seq1, seq2 = itertools.tee(seq, 2)

seq1 = toolz.pluck('value', seq1)  # what we did before
seq1 = toolz.accumulate(add, seq1)

last_three = deque(maxlen=3)
seq2 = map(last_three.append, seq2)

In [10]:
while True:
    try:
        item = next(seq1)
        print(item)
    
        next(seq2)
        print(last_three)
        
    except StopIteration:
        break


1
deque([{'name': 'Alice', 'value': 1}], maxlen=3)
3
deque([{'name': 'Alice', 'value': 1}, {'name': 'Bob', 'value': 2}], maxlen=3)
6
deque([{'name': 'Alice', 'value': 1}, {'name': 'Bob', 'value': 2}, {'name': 'Alice', 'value': 3}], maxlen=3)
10
deque([{'name': 'Bob', 'value': 2}, {'name': 'Alice', 'value': 3}, {'name': 'Alice', 'value': 4}], maxlen=3)
15
deque([{'name': 'Alice', 'value': 3}, {'name': 'Alice', 'value': 4}, {'name': 'Charlie', 'value': 5}], maxlen=3)
21
deque([{'name': 'Alice', 'value': 4}, {'name': 'Charlie', 'value': 5}, {'name': 'Bob', 'value': 6}], maxlen=3)
28
deque([{'name': 'Charlie', 'value': 5}, {'name': 'Bob', 'value': 6}, {'name': 'Alice', 'value': 7}], maxlen=3)

Also want

  • Handle multiple incoming streams with joins
  • Perform time-window operations like "group by 50 ms" or "slow down input stream, I'm swamped"
  • ...

Streamz

Same applications, just a different way of thinking about controlling data.


In [11]:
from streamz import Stream

In [12]:
L = []

In [13]:
# Simple linear stream
source = Stream()
stream = (source.map(json.loads)
                .map(lambda x: x['value'])
                .scan(add))

# Two actions whenever a value comes through
stream.sink(print)
stream.sink(L.append)


Out[13]:
<streamz.core.Sink at 0x7f25000b9b00>

In [14]:
for line in data:
    source.emit(line)


3
6
10
15
21
28

In [15]:
L


Out[15]:
[3, 6, 10, 15, 21, 28]

In [16]:
source.emit('{"name": "Charlie", "value": 100}');


128

In [17]:
L


Out[17]:
[3, 6, 10, 15, 21, 28, 128]

Easy to add on new components


In [18]:
stream.sliding_window(2).sink(print)


Out[18]:
<streamz.core.Sink at 0x7f25000b9cf8>

In [19]:
for line in data:
    source.emit(line)


129
131
(129, 131)
134
(131, 134)
138
(134, 138)
143
(138, 143)
149
(143, 149)
156
(149, 156)