In [1]:
import json
In [2]:
data = ['{"name": "Alice", "value": 1}',
'{"name": "Bob", "value": 2}',
'{"name": "Alice", "value": 3}',
'{"name": "Alice", "value": 4}',
'{"name": "Charlie", "value": 5}',
'{"name": "Bob", "value": 6}',
'{"name": "Alice", "value": 7}']
In [3]:
seq = list(map(json.loads, data))
seq
Out[3]:
In [4]:
import toolz
seq = list(toolz.pluck('value', seq))
seq
Out[4]:
In [5]:
sum(seq)
Out[5]:
In [6]:
def infinite_data():
for x in data:
yield x
# Here we stop, but we could keep going forever...
raise StopIteration
In [7]:
from operator import add
seq = infinite_data()
seq = map(json.loads, seq)
seq = toolz.pluck('value', seq)
seq = toolz.accumulate(add, seq)
In [8]:
for item in seq:
print(item)
In [9]:
import itertools
import logging
from collections import deque
seq = infinite_data()
seq = map(json.loads, data)
seq1, seq2 = itertools.tee(seq, 2)
seq1 = toolz.pluck('value', seq1) # what we did before
seq1 = toolz.accumulate(add, seq1)
last_three = deque(maxlen=3)
seq2 = map(last_three.append, seq2)
In [10]:
while True:
try:
item = next(seq1)
print(item)
next(seq2)
print(last_three)
except StopIteration:
break
In [11]:
from streamz import Stream
In [12]:
L = []
In [13]:
# Simple linear stream
source = Stream()
stream = (source.map(json.loads)
.map(lambda x: x['value'])
.scan(add))
# Two actions whenever a value comes through
stream.sink(print)
stream.sink(L.append)
Out[13]:
In [14]:
for line in data:
source.emit(line)
In [15]:
L
Out[15]:
In [16]:
source.emit('{"name": "Charlie", "value": 100}');
In [17]:
L
Out[17]:
In [18]:
stream.sliding_window(2).sink(print)
Out[18]:
In [19]:
for line in data:
source.emit(line)