Time Series with dask.dataframe

Dask.dataframe inherits timeseries functionality from Pandas.

This notebook shows off a few time series functions on artificial but moderately large data.

The interface should look familiar to existing Pandas users.

Setup

We import dask.dataframe and set up a progress bar to measure performance.


In [1]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

progress_bar = ProgressBar()
progress_bar.register()   # turn on progressbar globally

In [2]:
%matplotlib inline

Random data

We construct some random data.


In [3]:
import dask.dataframe as dd

df = dd.demo.make_timeseries(start='2000', end='2015', dtypes={'A': float, 'B': int},
                             freq='1s', partition_freq='3M', seed=1234)
df.head()


[########################################] | 100% Completed |  1.2s
Out[3]:
A B
2000-01-31 00:00:00 0.514361 1039
2000-01-31 00:00:01 -0.773895 1015
2000-01-31 00:00:02 -0.987844 957
2000-01-31 00:00:03 0.744278 1009
2000-01-31 00:00:04 -0.379458 998

Base Data on Disk

This will stop us from having to recompute the same random numbers constantly. Reading from disk ends up being a bit faster.


In [4]:
c = df.to_castra()
df = c.to_dask()


[########################################] | 100% Completed |  1min 31.3s

Cumulative Sum, datetime resampling, and plotting

We compute the cumulative sum of the A column. This will look like typical Brownian motion / a ticker plot. However the resolution of one second over 15 years is too big to give to matplotlib so instead we resample to weekly values. Finally we compute the result as a (small) pandas DataFrame and use Pandas and matplotlib to plot the results.


In [5]:
df.A.cumsum().resample('1w', how='mean').compute().plot()


[########################################] | 100% Completed | 25.0s
Out[5]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fd1f02cb290>

What was our computational bandwidth?


In [6]:
duration = progress_bar.last_duration
nbytes = df.A.nbytes.compute()


[########################################] | 100% Completed |  6.1s

In [7]:
nbytes / 1e6 / duration  # MB/s


Out[7]:
149.06195387444777

Profile to measure costs

We use the dask Profiler to measure the durations of all of our tasks running in parallel. We'll use this to gain intuition on what our current bottlenecks are.

Note for readers on github, this plot won't show up on github's notebook viewer. You can see the results by doing one of the following:

  1. Run this notebook yourself
  2. See saved results at this saved plot

In [8]:
from bokeh.plotting import output_notebook
output_notebook()

from dask.diagnostics import Profiler
prof = Profiler()

with prof:
    df.A.cumsum().resample('1w', how='mean').compute()
    
prof.visualize()


BokehJS successfully loaded.
[########################################] | 100% Completed | 23.1s
Out[8]:
<bokeh.plotting.Figure at 0x7fd1e283f7d0>

It looks like we're still dominated by data access. The load_partition calls take up the majority of the time.