Data Processing Pipelines


In [ ]:
import pandas as pd
import holoviews as hv

from holoviews import opts
from bokeh.sampledata import stocks
from holoviews.operation.timeseries import rolling, rolling_outlier_std
from holoviews.streams import Stream

hv.extension('bokeh')

opts.defaults(opts.Curve(width=600, framewise=True))

In the previous guides we discovered how to load and declare dynamic, live data and how to transform elements using operations. In this guide we will discover how to combine dynamic data with operations to declare lazy and declarative data processing pipelines, which can be used for interactive exploration but can also drive complex dashboards or even bokeh apps.

Declaring dynamic data

We will begin by declaring a function which loads some data. In this case we will just load some stock data from the bokeh but you could imagine querying this data using REST interface or some other API or even loading some large collection of data from disk or generating the data from some simulation or data processing job.


In [ ]:
def load_symbol(symbol, **kwargs):
    df = pd.DataFrame(getattr(stocks, symbol))
    df['date'] = df.date.astype('datetime64[ns]')
    return hv.Curve(df, ('date', 'Date'), ('adj_close', 'Adjusted Close'))

stock_symbols = ['AAPL', 'FB', 'GOOG', 'IBM', 'MSFT']
dmap = hv.DynamicMap(load_symbol, kdims='Symbol').redim.values(Symbol=stock_symbols)

We begin by displaying our DynamicMap to see what we are dealing with. Recall that a DynamicMap is only evaluated when you request the key so the load_symbol function is only executed when first displaying the DynamicMap and whenever we change the widget dropdown:


In [ ]:
dmap

Processing data

It is very common to want to process some data, for this purpose HoloViews provides so-called Operations, which are described in detail in the Transforming Elements. Operations are simply parameterized functions, which take HoloViews objects as input, transform them in some way and then return the output.

In combination with Dimensioned Containers such as HoloMap and GridSpace they are a powerful way to explore how the parameters of your transform affect the data. We will start with a simple example. HoloViews provides a rolling function which smoothes timeseries data with a rolling window. We will apply this operation with a rolling_window of 30, i.e. roughly a month of our daily timeseries data:


In [ ]:
smoothed = rolling(dmap, rolling_window=30)
smoothed

As you can see the rolling operation applies directly to our DynamicMap, smoothing each Curve before it is displayed. Applying an operation to a DynamicMap keeps the data as a DynamicMap, this means the operation is also applied lazily whenever we display or select a different symbol in the dropdown widget.

Controlling operations via Streams

In the previous section we briefly mentioned that in addition to regular widgets DynamicMap also supports streams, which allow us to define custom events our DynamicMap should subscribe to. To learn more about streams see the Responding to Events. Here we will declare a stream that controls the rolling window:


In [ ]:
rolling_stream = Stream.define('rolling', rolling_window=5)
stream = rolling_stream()

Now we can define a function that both loads the symbol and applies the rolling operation passing our rolling_window parameter to the operation:


In [ ]:
def rolled_data(symbol, rolling_window, **kwargs):
    curve = load_symbol(symbol)
    return rolling(curve, rolling_window=rolling_window)
    
rolled_dmap = hv.DynamicMap(rolled_data, kdims='Symbol',
                            streams=[stream]).redim.values(Symbol=stock_symbols)

rolled_dmap

Since we have a handle on the Stream we can now send events to it and watch the plot above update, let's start by setting the rolling_window=50.


In [ ]:
stream.event(rolling_window=50)

Instead of manually defining a function we can also do something much simpler, namely we can just apply the rolling operation to the original DynamicMap we defined and pass our rolling_stream to the operation. To make things a bit more interesting we will also apply the rolling_outlier_std function which computes outliers within the rolling_window. We supply our stream to both:


In [ ]:
stream = rolling_stream()

smoothed = rolling(dmap, streams=[stream])
outliers = rolling_outlier_std(dmap, streams=[stream])

smoothed * outliers.opts(color='red', marker='triangle')

Since the rolling_stream instance we created is bound to both operations, triggering an event on the stream will trigger both the Curve and the Scatter of outliers to be updated:


In [ ]:
stream.event(rolling_window=50)

We can chain operations like this indefinitely and attach streams to each stage. By chaining we can watch our visualization update whenever we change a stream value anywhere in the pipeline and HoloViews will be smart about which parts of the pipeline are recomputed, which allows us to build complex visualizations very quickly.

In later guides we will discover how to tie custom streams to custom widgets letting us easily control the stream values and making it trivial to define complex dashboards. paramNB is only one widget framework we could use: we could also choose paramBokeh to make use of bokeh widgets and deploy the dashboard on bokeh server, or we could manually link ipywidgets to our streams. For more information on how to deploy bokeh apps from HoloViews and build dashboards see the Deploying Bokeh Apps and Dashboards guides.