In [1]:
ls ../test-data/


2014-head.txt           EQY_US_ALL_BBO_20150731.zip@
2015-head.txt           small_test_data_public.h5
EQY_US_ALL_BBO_201111@  small_test_data_public.zip
EQY_US_ALL_BBO_201402@

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt

In [1]:
import numpy as np
import pandas as pd
import tables as tb

In [2]:
import h5py
import dask.dataframe as dd
import dask.bag as db

In [3]:
import blaze

In [4]:
fname = '../test-data/EQY_US_ALL_BBO_201402/EQY_US_ALL_BBO_20140206.h5'
max_sym = '/SPY/no_suffix'

In [5]:
fname = '../test-data/small_test_data_public.h5'
max_sym = '/IXQAJE/no_suffix'

In [6]:
# by default, this will be read-only
taq_tb = tb.open_file(fname)

In [8]:
%%time
rec_counts = {curr._v_pathname: len(curr) 
              for curr in taq_tb.walk_nodes('/', 'Table')}


CPU times: user 749 ms, sys: 13 ms, total: 762 ms
Wall time: 763 ms

In [9]:
# What's our biggest table? (in bytes)
max(rec_counts.values()) * 91 / 2 ** 20 # I think it's 91 bytes...


Out[9]:
0.0794076919555664

Anyway, under a gigabyte. So, nothing to worry about even if we have 24 cores.


In [10]:
# But what symbol is that?
max_sym = None
max_rows = 0
for sym, rows in rec_counts.items():
    if rows > max_rows:
        max_rows = rows
        max_sym = sym

In [11]:
max_sym, max_rows


Out[11]:
('/IXQAJE/no_suffix', 915)

Interesting... the S&P 500 ETF


In [14]:
# Most symbols also have way less rows - note this is log xvals
plt.hist(list(rec_counts.values()), bins=50, log=True)
plt.show()


Doing some compute

We'll use a "big" table to get some sense of timings


In [49]:
spy = taq_tb.get_node(max_sym)

In [16]:
# PyTables is record oriented...
%timeit np.mean(list(x['Bid_Price'] for x in spy.iterrows()))


1 loop, best of 3: 7.62 s per loop

In [50]:
# But this is faster...
%timeit np.mean(spy[:]['Bid_Price'])


The slowest run took 383.78 times longer than the fastest. This could mean that an intermediate result is being cached.
10000 loops, best of 3: 47.6 µs per loop

In [51]:
np.mean(spy[:]['Bid_Price'])


Out[51]:
18.500255081967214

Using numexpr?

numexpr is currently not set up to do reductions via HDF5. I've opened an issue here: https://github.com/PyTables/PyTables/issues/548


In [14]:
spy_bp = spy.cols.Bid_Price

In [15]:
# this works...
np.mean(spy_bp)


Out[15]:
176.82760773125673

In [51]:
# But it can't use numexpr
expr = tb.Expr('sum(spy_bp)')

In [61]:
# You can use numexpr to get the values of the column... but that's silly
# (sum doesn't work right, and the axis argument is non-functional)
%timeit result = expr.eval().mean()


1 loop, best of 3: 3.62 s per loop

In [60]:
tb.Expr('spy_bp').eval().mean()


Out[60]:
176.82760773125673

h5py


In [52]:
taq_tb.close()

In [6]:
%%time
spy_h5py = h5py.File(fname)[max_sym]


CPU times: user 1.64 ms, sys: 1.92 ms, total: 3.55 ms
Wall time: 3.36 ms

In [123]:
np.mean(spy_h5py['Bid_Price'])


Out[123]:
18.500255081967214

h5py may be a touch faster than pytables for this kind of usage. But why does pandas use pytables?


In [55]:
%%timeit
np.mean(spy_h5py['Bid_Price'])


The slowest run took 5.02 times longer than the fastest. This could mean that an intermediate result is being cached.
1000 loops, best of 3: 253 µs per loop

Dask

It seems that there should be no need to, e.g., use h5py - but dask's read_hdf doens't seem to be working nicely...


In [28]:
taq_tb.close()

spy_h5py = h5py.File(fname)[max_sym]


In [56]:
store = pd.HDFStore(fname)

In [31]:
store = pd.HDFStore('../test-data/')

In [62]:
# this is a fine way to iterate over our datasets (in addition to what's available in PyTables and h5py)
it = store.items()

In [101]:
key, tab = next(it)

In [102]:
tab


Out[102]:
/KSJXFM/no_suffix (Table(33,), fletcher32, shuffle, blosc:lz4hc(9)) ''
  description := {
  "Time": Float64Col(shape=(), dflt=0.0, pos=0),
  "hour": Int8Col(shape=(), dflt=0, pos=1),
  "minute": Int8Col(shape=(), dflt=0, pos=2),
  "msec": UInt16Col(shape=(), dflt=0, pos=3),
  "Exchange": StringCol(itemsize=1, shape=(), dflt=b'', pos=4),
  "Bid_Price": Float64Col(shape=(), dflt=0.0, pos=5),
  "Bid_Size": Int32Col(shape=(), dflt=0, pos=6),
  "Ask_Price": Float64Col(shape=(), dflt=0.0, pos=7),
  "Ask_Size": Int32Col(shape=(), dflt=0, pos=8),
  "Quote_Condition": StringCol(itemsize=1, shape=(), dflt=b'', pos=9),
  "Market_Maker": StringCol(itemsize=4, shape=(), dflt=b'', pos=10),
  "Bid_Exchange": StringCol(itemsize=1, shape=(), dflt=b'', pos=11),
  "Ask_Exchange": StringCol(itemsize=1, shape=(), dflt=b'', pos=12),
  "Sequence_Number": Int64Col(shape=(), dflt=0, pos=13),
  "National_BBO_Ind": StringCol(itemsize=1, shape=(), dflt=b'', pos=14),
  "NASDAQ_BBO_Ind": StringCol(itemsize=1, shape=(), dflt=b'', pos=15),
  "Quote_Cancel_Correction": StringCol(itemsize=1, shape=(), dflt=b'', pos=16),
  "Source_of_Quote": StringCol(itemsize=1, shape=(), dflt=b'', pos=17),
  "Retail_Interest_Indicator_RPI": StringCol(itemsize=1, shape=(), dflt=b'', pos=18),
  "Short_Sale_Restriction_Indicator": StringCol(itemsize=1, shape=(), dflt=b'', pos=19),
  "LULD_BBO_Indicator_CQS": StringCol(itemsize=1, shape=(), dflt=b'', pos=20),
  "LULD_BBO_Indicator_UTP": StringCol(itemsize=1, shape=(), dflt=b'', pos=21),
  "FINRA_ADF_MPID_Indicator": StringCol(itemsize=1, shape=(), dflt=b'', pos=22),
  "SIP_generated_Message_Identifier": StringCol(itemsize=1, shape=(), dflt=b'', pos=23),
  "National_BBO_LULD_Indicator": StringCol(itemsize=1, shape=(), dflt=b'', pos=24)}
  byteorder := 'little'
  chunkshape := (1040,)

In [106]:
# The columns argument doesn't seem to work...
store.select(max_sym, columns=['Bid_Price']).head()


Out[106]:
Time hour minute msec Exchange Bid_Price Bid_Size Ask_Price Ask_Size Quote_Condition ... NASDAQ_BBO_Ind Quote_Cancel_Correction Source_of_Quote Retail_Interest_Indicator_RPI Short_Sale_Restriction_Indicator LULD_BBO_Indicator_CQS LULD_BBO_Indicator_UTP FINRA_ADF_MPID_Indicator SIP_generated_Message_Identifier National_BBO_LULD_Indicator
0 1.391697e+09 9 28 20433 P 18.8799 37 17.2773 92 R ... 2 C
1 1.391697e+09 9 30 221 Z 18.1199 0 19.0143 40 R ... 2 C
2 1.391697e+09 9 30 4263 Y 18.9737 35 18.0378 14 R ... 2 A C B
3 1.391697e+09 9 31 8022 P 18.6253 0 17.0418 38 R ... 2 C
4 1.391697e+09 9 32 16402 N 19.0082 0 16.9483 20 R ... 2 C

5 rows × 25 columns


In [109]:
# columns also doesn't work here...
pd.read_hdf(fname, max_sym, columns=['Bid_Price']).head()


Out[109]:
Time hour minute msec Exchange Bid_Price Bid_Size Ask_Price Ask_Size Quote_Condition ... NASDAQ_BBO_Ind Quote_Cancel_Correction Source_of_Quote Retail_Interest_Indicator_RPI Short_Sale_Restriction_Indicator LULD_BBO_Indicator_CQS LULD_BBO_Indicator_UTP FINRA_ADF_MPID_Indicator SIP_generated_Message_Identifier National_BBO_LULD_Indicator
0 1.391697e+09 9 28 20433 P 18.8799 37 17.2773 92 R ... 2 C
1 1.391697e+09 9 30 221 Z 18.1199 0 19.0143 40 R ... 2 C
2 1.391697e+09 9 30 4263 Y 18.9737 35 18.0378 14 R ... 2 A C B
3 1.391697e+09 9 31 8022 P 18.6253 0 17.0418 38 R ... 2 C
4 1.391697e+09 9 32 16402 N 19.0082 0 16.9483 20 R ... 2 C

5 rows × 25 columns


In [7]:
# So we use h5py (actually, pytables appears faster...)
spy_dask = dd.from_array(spy_h5py)

In [8]:
mean_job = spy_dask['Bid_Price'].mean()

In [127]:
mean_job.compute()


Out[127]:
18.500255081967214

In [9]:
# This is appreciably slower than directly computing the mean w/ numpy
%timeit mean_job.compute()


1 loop, best of 3: 9.55 s per loop

Dask for an actual distributed task (but only on one file for now)


In [9]:
class DDFs:
    # A (key, table) list
    datasets = []
    dbag = None

    def __init__(self, h5fname):
        h5in = h5py.File(h5fname)
        h5in.visititems(self.collect_dataset)
    
    def collect_dataset(self, key, table):
        if isinstance(table, h5py.Dataset):
            self.datasets.append(dd.from_array(table)['Bid_Price'].mean())
            
    def compute_mean(self):
        # This is still very slow!
        self.results = {key: result for key, result in dd.compute(*self.datasets)}

In [14]:
%%time
ddfs = DDFs(fname)


CPU times: user 273 ms, sys: 22.8 ms, total: 296 ms
Wall time: 293 ms

In [15]:
ddfs.datasets[:5]


Out[15]:
[<dask.dataframe.core.Scalar at 0x7fd34db526d8>,
 <dask.dataframe.core.Scalar at 0x7fd34db4f358>,
 <dask.dataframe.core.Scalar at 0x7fd34db4f320>,
 <dask.dataframe.core.Scalar at 0x7fd34db4f588>,
 <dask.dataframe.core.Scalar at 0x7fd34db4fe48>]

In [19]:
len(ddfs.datasets)


Out[19]:
8035

In [21]:
dd.compute?

In [20]:
%%time
results = dd.compute(*ddfs.datasets[:20])


CPU times: user 3.57 s, sys: 1.16 s, total: 4.74 s
Wall time: 4.86 s

In [23]:
import dask.multiprocessing

In [24]:
%%time
# This crashes out throwing lots of KeyErrors
results = dd.compute(*ddfs.datasets[:20], get=dask.multiprocessing.get)


Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Traceback (most recent call last):
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Traceback (most recent call last):
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Traceback (most recent call last):
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Traceback (most recent call last):
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
KeyError: 140545251730248
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
KeyError: 140545251730248
KeyError: 140545251730248
KeyError: 140545251731592
KeyError: 140545251730248
KeyError: 140545251730248
Traceback (most recent call last):
KeyError: 140545251730248
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
KeyError: 140545251730248
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
KeyError: 140545251731592
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Traceback (most recent call last):
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Traceback (most recent call last):
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Traceback (most recent call last):
KeyError: 140545251731592
KeyError: 140545251730248
Traceback (most recent call last):
Traceback (most recent call last):
KeyError: 140545251730248
Traceback (most recent call last):
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
KeyError: 140545251731592
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Traceback (most recent call last):
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
KeyError: 140545251731592
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
KeyError: 140545251730248
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
KeyError: 140545251731592
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
KeyError: 140545251730248
Traceback (most recent call last):
Traceback (most recent call last):
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
Traceback (most recent call last):
Traceback (most recent call last):
KeyError: 140545251731592
Traceback (most recent call last):
Traceback (most recent call last):
Exception ignored in: 'h5py._objects.ObjectID.__dealloc__'
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
KeyError: 140545251731592
Traceback (most recent call last):
KeyError: 140545211536520
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
KeyError: 140545251730248
KeyError: 140545251730248
Traceback (most recent call last):
KeyError: 140545251730248
KeyError: 140545251731592
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
KeyError: 140545251730248
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
KeyError: 140545251731592
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
KeyError: 140545251731592
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
  File "h5py/_objects.pyx", line 197, in h5py._objects.ObjectID.__dealloc__ (-------src-dir-------/h5py/_objects.c:4494)
KeyError: 140545251730248
KeyError: 140545251730248
KeyError: 140545140359688
KeyError: 140545140359688
KeyError: 140545140359688
Process ForkPoolWorker-17:
Process ForkPoolWorker-19:
Process ForkPoolWorker-28:
Process ForkPoolWorker-20:
Process ForkPoolWorker-25:
Process ForkPoolWorker-29:
Process ForkPoolWorker-12:
Process ForkPoolWorker-21:
Process ForkPoolWorker-24:
Process ForkPoolWorker-5:
Process ForkPoolWorker-23:
Process ForkPoolWorker-22:
Process ForkPoolWorker-27:
Process ForkPoolWorker-11:
Process ForkPoolWorker-32:
Process ForkPoolWorker-6:
Process ForkPoolWorker-26:
Process ForkPoolWorker-31:
Process ForkPoolWorker-30:
Process ForkPoolWorker-8:
Process ForkPoolWorker-9:
Process ForkPoolWorker-14:
Process ForkPoolWorker-10:
Process ForkPoolWorker-15:
Process ForkPoolWorker-7:
Process ForkPoolWorker-3:
Process ForkPoolWorker-13:
Process ForkPoolWorker-1:
Process ForkPoolWorker-16:
Process ForkPoolWorker-18:
Process ForkPoolWorker-2:
Process ForkPoolWorker-4:
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 343, in get
    res = self._reader.recv_bytes()
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
  File "/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-24-83d113f7458a> in <module>()
----> 1 get_ipython().run_cell_magic('time', '', 'results = dd.compute(*ddfs.datasets[:20], get=dask.multiprocessing.get)')

/home/dav/miniconda3/envs/TAQ/lib/python3.5/site-packages/IPython/core/interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
   2118             magic_arg_s = self.var_expand(line, stack_depth)
   2119             with self.builtin_trap:
-> 2120                 result = fn(magic_arg_s, cell)
   2121             return result
   2122 

<decorator-gen-60> in time(self, line, cell, local_ns)

/home/dav/miniconda3/envs/TAQ/lib/python3.5/site-packages/IPython/core/magic.py in <lambda>(f, *a, **k)
    191     # but it's overkill for just that one bit of state.
    192     def magic_deco(arg):
--> 193         call = lambda f, *a, **k: f(*a, **k)
    194 
    195         if callable(arg):

/home/dav/miniconda3/envs/TAQ/lib/python3.5/site-packages/IPython/core/magics/execution.py in time(self, line, cell, local_ns)
   1175         else:
   1176             st = clock2()
-> 1177             exec(code, glob, local_ns)
   1178             end = clock2()
   1179             out = None

<timed exec> in <module>()

/home/dav/miniconda3/envs/TAQ/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
    108                 for opt, val in groups.items()])
    109     keys = [var._keys() for var in variables]
--> 110     results = get(dsk, keys, **kwargs)
    111 
    112     results_iter = iter(results)

/home/dav/miniconda3/envs/TAQ/lib/python3.5/site-packages/dask/multiprocessing.py in get(dsk, keys, optimizations, num_workers, func_loads, func_dumps, **kwargs)
     76         # Run
     77         result = get_async(apply_async, len(pool._pool), dsk3, keys,
---> 78                            queue=queue, get_id=_process_get_id, **kwargs)
     79     finally:
     80         if cleanup:

/home/dav/miniconda3/envs/TAQ/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
    472     while state['waiting'] or state['ready'] or state['running']:
    473         try:
--> 474             key, res, tb, worker_id = queue.get()
    475         except KeyboardInterrupt:
    476             for f in finish_cbs:

<string> in get(self, *args, **kwds)

/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/managers.py in _callmethod(self, methodname, args, kwds)
    715 
    716         conn.send((self._id, methodname, args, kwds))
--> 717         kind, result = conn.recv()
    718 
    719         if kind == '#RETURN':

/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/connection.py in recv(self)
    248         self._check_closed()
    249         self._check_readable()
--> 250         buf = self._recv_bytes()
    251         return ForkingPickler.loads(buf.getbuffer())
    252 

/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/connection.py in _recv_bytes(self, maxsize)
    405 
    406     def _recv_bytes(self, maxsize=None):
--> 407         buf = self._recv(4)
    408         size, = struct.unpack("!i", buf.getvalue())
    409         if maxsize is not None and size > maxsize:

/home/dav/miniconda3/envs/TAQ/lib/python3.5/multiprocessing/connection.py in _recv(self, size, read)
    377         remaining = size
    378         while remaining > 0:
--> 379             chunk = read(handle, remaining)
    380             n = len(chunk)
    381             if n == 0:

KeyboardInterrupt: 

In [ ]:
results[0]

This ends up being a little faster than just using blaze (see below), but about half the time is spent setting thigs up in Dask.


In [5]:
from dask import delayed

@delayed
def mean_column(key, data, column='Bid_Price'):
    return key, blaze.data(data)[column].mean()

class DDFs:
    # A (key, table) list
    datasets = []

    def __init__(self, h5fname):
        h5in = h5py.File(h5fname)
        h5in.visititems(self.collect_dataset)
    
    def collect_dataset(self, key, table):
        if isinstance(table, h5py.Dataset):
            self.datasets.append(mean_column(key, table))
            
    def compute_mean(self, limit=None):
        # Note that a limit of None includes all values
        self.results = {key: result for key, result in dd.compute(*self.datasets[:limit])}

In [6]:
%%time
ddfs = DDFs(fname)


CPU times: user 4.09 s, sys: 2.07 s, total: 6.16 s
Wall time: 23.8 s

In [7]:
%%time
ddfs.compute_mean()


CPU times: user 14.6 s, sys: 1.57 s, total: 16.2 s
Wall time: 21.9 s

In [10]:
next(iter(ddfs.results.items()))


Out[10]:
('BSFT/no_suffix', 26.98176844121505)

In [36]:
# You can also compute individual results as needed
ddfs.datasets[0].compute()


Out[36]:
('A/no_suffix', 57.60852492511026)

Blaze?

Holy crap!


In [19]:
spy_blaze = blaze.data(spy_h5py)

In [20]:
%time 
spy_blaze['Ask_Price'].mean()


CPU times: user 8 µs, sys: 1 µs, total: 9 µs
Wall time: 21 µs
Out[20]:
176.84371349272752

In [16]:
taq_tb = tb.open_file(fname)
spy_tb = taq_tb.get_node(max_sym)

In [17]:
spy_blaze = blaze.data(spy_tb)

In [18]:
%time spy_blaze['Bid_Price'].mean()


CPU times: user 0 ns, sys: 978 µs, total: 978 µs
Wall time: 647 µs
Out[18]:
176.82760773125673

In [21]:
taq_tb.close()

Read directly with Blaze

Somehow this is not as impressive


In [15]:
%%time
blaze_h5_file = blaze.data(fname)

# This is rather nice
blaze_h5_file.SPY.no_suffix.Bid_Price.mean()


CPU times: user 12.7 s, sys: 1.05 s, total: 13.7 s
Wall time: 59.9 s

In [39]:
blaze_h5_file.ZFKOJB.no_suffix.Bid_Price.mean()


Out[39]:
18.475936662749703

Do some actual compute with Blaze


In [38]:
taq_h5py = h5py.File(fname)

In [50]:
class SymStats:
    means = {}

    def compute_stats(self, key, table):
        if isinstance(table, h5py.Dataset):
            self.means[key] = blaze.data(table)['Bid_Price'].mean()

In [51]:
ss = SymStats()

In [52]:
%time taq_h5py.visititems(ss.compute_stats)


CPU times: user 11.2 s, sys: 1.74 s, total: 12.9 s
Wall time: 51.8 s

In [56]:
means = iter(ss.means.items())

In [65]:
next(means)


Out[65]:
('SYT/no_suffix', 67.53453714044622)

In [67]:
ss.means['SPY/no_suffix']


Out[67]:
176.82760773125673

Pandas?

To load with Pandas, you need to close the pytables session


In [19]:
taq_tb = tb.open_file(fname)

In [18]:
taq_tb.close()

In [16]:
pd.read_hdf?

In [17]:
pd.read_hdf(fname, max_sym, start=0, stop=1, chunksize=1)


Out[17]:
<pandas.io.pytables.TableIterator at 0x11866d5f8>

In [14]:
max_sym


Out[14]:
'/IXQAJE/no_suffix'

In [15]:
fname


Out[15]:
'../test-data/small_test_data_public.h5'

In [9]:
%%timeit
node = taq_tb.get_node(max_sym)
pd.DataFrame.from_records(node[0:1])


1000 loops, best of 3: 1.18 ms per loop

In [21]:
%%timeit
# I've also tried this with `.get_node()`, same speed
pd.DataFrame.from_records(taq_tb.root.IXQAJE.no_suffix)


1 loop, best of 3: 234 ms per loop

In [6]:
%%timeit
pd.read_hdf(fname, max_sym)


10 loops, best of 3: 34 ms per loop

In [7]:
# Pandas has optimizations it likes to do with 
%timeit spy_df = pd.read_hdf(fname, max_sym)


10 loops, best of 3: 32.6 ms per loop

In [9]:
# Actually do it
spy_df = pd.read_hdf(fname, max_sym)

In [10]:
# This is fast, but loading is slow...
%timeit spy_df.Bid_Price.mean()


The slowest run took 6.25 times longer than the fastest. This could mean that an intermediate result is being cached 
10000 loops, best of 3: 57.8 µs per loop