Holoviews 'streams demo'

from https://github.com/ioam/holoviews/blob/master/examples/user_guide/15-Streaming_Data.ipynb

(Holoviews/Datashader/Bokeh/Jupyter Notebook)


In [ ]:
# These are examples from Holoviews Developer Philipp Rudiger:
#   https://anaconda.org/philippjfr/working_with_streaming_data/notebook
# As of 2017-10-26:
#   . this is bleeding-edge,
#   . this Notebook is made to check it can work well also on Windows / WinPython.
#
# User may notice we're getting clother to a PyQtGraph style of graphics

In [ ]:
import time
import numpy as np
import pandas as pd
import holoviews as hv

from holoviews.streams import Pipe, Buffer

import streamz
import streamz.dataframe

hv.extension('bokeh')

Pipe

A Pipe allows data to be pushed into a DynamicMap callback to change a visualization


In [ ]:
# Pipe opening
pipe = Pipe(data=[])
vector_dmap = hv.DynamicMap(hv.VectorField, streams=[pipe])
vector_dmap.redim.range(x=(-1, 1), y=(-1, 1))

In [ ]:
#Feeding pipe
x,y  = np.mgrid[-10:11,-10:11] * 0.1
sine_rings  = np.sin(x**2+y**2)*np.pi+np.pi
exp_falloff = 1/np.exp((x**2+y**2)/8)

for i in np.linspace(0, 1, 25):
    time.sleep(0.1)
    pipe.send([x,y,sine_rings*i, exp_falloff])

Buffer

Buffer automatically accumulates the last N rows of the tabular data, where N is defined by the length.

Plotting backends (such as bokeh) can optimize plot updates by sending just the latest patch. This optimization works only if the data object held by the Buffer is identical to the plotted Element data


In [ ]:
example = pd.DataFrame({'x': [], 'y': [], 'count': []}, columns=['x', 'y', 'count'])
dfstream = Buffer(example, length=100, index=False)
curve_dmap = hv.DynamicMap(hv.Curve, streams=[dfstream])
point_dmap = hv.DynamicMap(hv.Points, streams=[dfstream])

In [ ]:
%%opts Points [color_index='count', xaxis=None, yaxis=None] (line_color='black', size=5)
%%opts Curve (line_width=1, color='black')
curve_dmap * point_dmap

In [ ]:
def gen_brownian():
    x, y, count = 0, 0, 0
    while True:
        x += np.random.randn()
        y += np.random.randn()
        count += 1
        yield pd.DataFrame([(x, y, count)], columns=['x', 'y', 'count'])

brownian = gen_brownian()
for i in range(200):
    dfstream.send(next(brownian))

In [ ]:
dfstream.clear()

Asynchronous updates Using the Streamz library

Let's start with a fairly simple example:

  • Declare a streamz.Stream and a Pipe object and connect them into a pipeline into which we can push data.
  • Use a sliding_window of 10, which will first wait for 10 sets of stream updates to accumulate. At that point and for every subsequent update, it will apply pd.concat to combine the most recent 10 updates into a new dataframe.
  • Use the sink method on the streamz.Stream to send the resulting collection of 10 updates to Pipe.
  • Declare a DynamicMap that takes the sliding window of concatenated DataFrames and displays it using a Scatter Element.
  • Color the Scatter points by their 'count' and set a range, then display:

In [ ]:
point_source = streamz.Stream()
pipe = Pipe(data=[])
point_source.sliding_window(20).map(pd.concat).sink(pipe.send) # Connect streamz to the Pipe
scatter_dmap = hv.DynamicMap(hv.Scatter, streams=[pipe])

In [ ]:
%%opts Scatter [color_index='count', bgcolor='black']
scatter_dmap.redim.range(y=(-4, 4))

In [ ]:
for i in range(100):
    df = pd.DataFrame({'x': np.random.rand(100), 'y': np.random.randn(100), 'count': i},
                      columns=['x', 'y', 'count'])
    point_source.emit(df)

StreamingDataFrame

A simple example


In [ ]:
simple_sdf = streamz.dataframe.Random(freq='10ms', interval='100ms')
print(simple_sdf.index)
simple_sdf.example.dtypes

In [ ]:
%%opts Curve [width=500 show_grid=True]
sdf = (simple_sdf-0.5).cumsum()
hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x)])

The Random StreamingDataFrame will asynchronously emit events until it is stopped, which we can do by calling the stop method.


In [ ]:
simple_sdf.stop()

Making use of the StreamingDataFrame API


In [ ]:
%%opts Curve [width=500 show_grid=True]
source_df = streamz.dataframe.Random(freq='5ms', interval='100ms')
sdf = (source_df-0.5).cumsum()
raw_dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x)])
smooth_dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x.rolling('500ms').mean())])

raw_dmap.relabel('raw') * smooth_dmap.relabel('smooth')

In [ ]:
source_df.stop()

Controlling the backlog


In [ ]:
from functools import partial
multi_source = streamz.dataframe.Random(freq='5ms', interval='100ms')
sdf = (multi_source-0.5).cumsum()
hv.DynamicMap(hv.Table, streams=[Buffer(sdf.x, length=10)]) +\
hv.DynamicMap(partial(hv.BoxWhisker, kdims=[], vdims=['x']), streams=[Buffer(sdf.x, length=100)])

Updating multiple cells

Since a StreamingDataFrame will emit data until it is stopped we can subscribe multiple plots across different cells to the same stream:


In [ ]:
hv.DynamicMap(hv.Scatter, streams=[Buffer(sdf.x)])

In [ ]:
multi_source.stop()

Applying operations


In [ ]:
hist_source = streamz.dataframe.Random(freq='5ms', interval='100ms')
sdf = (hist_source-0.5).cumsum()
dmap = hv.DynamicMap(hv.Dataset, streams=[Buffer(sdf.x, length=500)])
hv.operation.histogram(dmap, dimension='x')

In [ ]:
hist_source.stop()

Datashading

The same approach will also work for the datashader operation letting us datashade the entire backlog window even if we make it very large:


In [ ]:
%%opts RGB [width=600]
from holoviews.operation.datashader import datashade
from bokeh.palettes import Blues8
large_source = streamz.dataframe.Random(freq='100us', interval='200ms')
sdf = (large_source-0.5).cumsum()
dmap = hv.DynamicMap(hv.Curve, streams=[Buffer(sdf.x, length=100000)])
datashade(dmap, streams=[hv.streams.PlotSize], normalization='linear', cmap=Blues8)

In [ ]:
large_source.stop()

In [ ]: