In [ ]:
# These are examples from Holoviews Developer Philipp Rudiger:
# https://anaconda.org/philippjfr/working_with_streaming_data/notebook
# As of 2017-10-26:
# . this is bleeding-edge,
# . this Notebook is made to check it can work well also on Windows / WinPython.
#
# User may notice we're getting clother to a PyQtGraph style of graphics
In [ ]:
import time
import numpy as np
import pandas as pd
import holoviews as hv
from holoviews.streams import Pipe, Buffer
import streamz
import streamz.dataframe
hv.extension('bokeh')
In [ ]:
# Pipe opening
pipe = Pipe(data=[])
vector_dmap = hv.DynamicMap(hv.VectorField, streams=[pipe])
vector_dmap.redim.range(x=(-1, 1), y=(-1, 1))
In [ ]:
#Feeding pipe
x,y = np.mgrid[-10:11,-10:11] * 0.1
sine_rings = np.sin(x**2+y**2)*np.pi+np.pi
exp_falloff = 1/np.exp((x**2+y**2)/8)
for i in np.linspace(0, 1, 25):
time.sleep(0.1)
pipe.send([x,y,sine_rings*i, exp_falloff])
Buffer automatically accumulates the last N rows of the tabular data, where N is defined by the length.
Plotting backends (such as bokeh) can optimize plot updates by sending just the latest patch. This optimization works only if the data object held by the Buffer is identical to the plotted Element data
In [ ]:
example = pd.DataFrame({'x': [], 'y': [], 'count': []}, columns=['x', 'y', 'count'])
dfstream = Buffer(example, length=100, index=False)
curve_dmap = hv.DynamicMap(hv.Curve, streams=[dfstream])
point_dmap = hv.DynamicMap(hv.Points, streams=[dfstream])
In [ ]:
%%opts Points [color_index='count', xaxis=None, yaxis=None] (line_color='black', size=5)
%%opts Curve (line_width=1, color='black')
curve_dmap * point_dmap
In [ ]:
def gen_brownian():
x, y, count = 0, 0, 0
while True:
x += np.random.randn()
y += np.random.randn()
count += 1
yield pd.DataFrame([(x, y, count)], columns=['x', 'y', 'count'])
brownian = gen_brownian()
for i in range(200):
dfstream.send(next(brownian))
In [ ]:
dfstream.clear()
Let's start with a fairly simple example:
In [ ]:
point_source = streamz.Stream()
pipe = Pipe(data=[])
point_source.sliding_window(20).map(pd.concat).sink(pipe.send) # Connect streamz to the Pipe
scatter_dmap = hv.DynamicMap(hv.Scatter, streams=[pipe])
In [ ]:
%%opts Scatter [color_index='count', bgcolor='black']
scatter_dmap.redim.range(y=(-4, 4))
In [ ]:
for i in range(100):
df = pd.DataFrame({'x': np.random.rand(100), 'y': np.random.randn(100), 'count': i},
columns=['x', 'y', 'count'])
point_source.emit(df)
In [ ]:
simple_sdf = streamz.dataframe.Random(freq='10ms', interval='100ms')
print(simple_sdf.index)
simple_sdf.example.dtypes
In [ ]:
%%opts Curve [width=500 show_grid=True]
sdf = (simple_sdf-0.5).cumsum()
hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x)])
The Random
StreamingDataFrame will asynchronously emit events until it is stopped, which we can do by calling the stop
method.
In [ ]:
simple_sdf.stop()
In [ ]:
%%opts Curve [width=500 show_grid=True]
source_df = streamz.dataframe.Random(freq='5ms', interval='100ms')
sdf = (source_df-0.5).cumsum()
raw_dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x)])
smooth_dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x.rolling('500ms').mean())])
raw_dmap.relabel('raw') * smooth_dmap.relabel('smooth')
In [ ]:
source_df.stop()
In [ ]:
from functools import partial
multi_source = streamz.dataframe.Random(freq='5ms', interval='100ms')
sdf = (multi_source-0.5).cumsum()
hv.DynamicMap(hv.Table, streams=[Buffer(sdf.x, length=10)]) +\
hv.DynamicMap(partial(hv.BoxWhisker, kdims=[], vdims=['x']), streams=[Buffer(sdf.x, length=100)])
Since a StreamingDataFrame
will emit data until it is stopped we can subscribe multiple plots across different cells to the same stream:
In [ ]:
hv.DynamicMap(hv.Scatter, streams=[Buffer(sdf.x)])
In [ ]:
multi_source.stop()
In [ ]:
hist_source = streamz.dataframe.Random(freq='5ms', interval='100ms')
sdf = (hist_source-0.5).cumsum()
dmap = hv.DynamicMap(hv.Dataset, streams=[Buffer(sdf.x, length=500)])
hv.operation.histogram(dmap, dimension='x')
In [ ]:
hist_source.stop()
The same approach will also work for the datashader operation letting us datashade the entire backlog
window even if we make it very large:
In [ ]:
%%opts RGB [width=600]
from holoviews.operation.datashader import datashade
from bokeh.palettes import Blues8
large_source = streamz.dataframe.Random(freq='100us', interval='200ms')
sdf = (large_source-0.5).cumsum()
dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x, length=100000)])
datashade(dmap, streams=[hv.streams.PlotSize], normalization='linear', cmap=Blues8)
In [ ]:
large_source.stop()
In [ ]: