"Big Pandas" - Dask from the Inside

Part 5 - Dask graphs

PyData Berlin tutorial, 30 June 2017

Stephen Simmons

This part looks at what dask DataFrame actually are, a lazily evaluated dependency graph, and how these get executed


In [1]:
# Complete set of Python 3.6 imports used for these examples

# Standard modules
import io
import logging
import lzma
import multiprocessing
import os
import ssl
import sys
import time
import urllib.request
import zipfile

# Third-party modules
import fastparquet      # Needs python-snappy and llvmlite
import graphviz         # To visualize Dask graphs 
import numpy as np
import pandas as pd
import psutil           # Memory stats
import dask
import dask.dataframe as dd
import bokeh.io         # For Dask profile graphs
import seaborn as sns   # For colormaps

# Support multiple lines of output in each cell
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Don't wrap tables
pd.options.display.max_rows = 20
pd.options.display.max_columns = 20
pd.options.display.width = 300

# Show matplotlib and bokeh graphs inline in Jupyter notebook
%matplotlib inline
bokeh.io.output_notebook()

print(sys.version)
np.__version__, pd.__version__, dask.__version__


Loading BokehJS ...
3.6.0 |Continuum Analytics, Inc.| (default, Dec 23 2016, 12:22:00) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]
Out[1]:
('1.11.3', '0.19.2', '0.14.1')

So what exactly is a Dask DataFrame?


In [97]:
df = pd.DataFrame([[1,2,3],[4,5,6],[7,8,9],[10,11,12],[13,14,15]], columns=['a','b','c'])
print(df)


    a   b   c
0   1   2   3
1   4   5   6
2   7   8   9
3  10  11  12
4  13  14  15

In [131]:
ddf = dd.from_pandas(df, npartitions=1)
print(ddf)
ddf.divisions
print(ddf._meta)
ddf._name
ddf.dask
ddf.visualize()


Dask DataFrame Structure:
                   a      b      c
npartitions=1                     
0              int64  int64  int64
4                ...    ...    ...
Dask Name: from_pandas, 1 tasks
Out[131]:
(0, 4)
Empty DataFrame
Columns: [a, b, c]
Index: []
Out[131]:
'from_pandas-b71f6a90c7bade4bc0f2c527b857767a'
Out[131]:
{('from_pandas-b71f6a90c7bade4bc0f2c527b857767a', 0):     a   b   c
 0   1   2   3
 1   4   5   6
 2   7   8   9
 3  10  11  12
 4  13  14  15}
Out[131]:

In [133]:
ddf = dd.from_pandas(df, npartitions=2)
print(ddf)
ddf.divisions
print(ddf._meta)
ddf._name
ddf.dask
ddf.visualize()


Dask DataFrame Structure:
                   a      b      c
npartitions=2                     
0              int64  int64  int64
3                ...    ...    ...
4                ...    ...    ...
Dask Name: from_pandas, 2 tasks
Out[133]:
(0, 3, 4)
Empty DataFrame
Columns: [a, b, c]
Index: []
Out[133]:
'from_pandas-de36e0f9b56aa90e818c2dda9e03260c'
Out[133]:
{('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0):    a  b  c
 0  1  2  3
 1  4  5  6
 2  7  8  9, ('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 1):     a   b   c
 3  10  11  12
 4  13  14  15}
Out[133]:

In [144]:
ddf = dd.from_pandas(df, npartitions=2).head(n=2, npartitions=2, compute=False)
print(ddf)
ddf.divisions
print(ddf._meta)
ddf._name
ddf.dask
ddf.visualize()


Dask DataFrame Structure:
                   a      b      c
npartitions=1                     
0              int64  int64  int64
4                ...    ...    ...
Dask Name: head, 5 tasks
Out[144]:
(0, 4)
Empty DataFrame
Columns: [a, b, c]
Index: []
Out[144]:
'head-2-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c'
Out[144]:
{('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0):    a  b  c
 0  1  2  3
 1  4  5  6
 2  7  8  9, ('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 1):     a   b   c
 3  10  11  12
 4  13  14  15, ('head-2-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
  0): (<function dask.dataframe.core.safe_head>, (<function dask.dataframe.core._concat>,
   [('head-partial-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0),
    ('head-partial-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
     1)]), 2), ('head-partial-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
  0): (<methodcaller: head>,
  ('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0),
  2), ('head-partial-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
  1): (<methodcaller: head>, ('from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
   1), 2)}
Out[144]:

In [146]:
ddf._keys()


Out[146]:
[('head-2-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0)]

In [139]:
ddf.compute()


Out[139]:
a b c
0 1 2 3
1 4 5 6

In [123]:
ddf2 = ddf1.head(1,compute=False)
ddf2.visualize()
ddf2._name
ddf2.dask


Out[123]:
Out[123]:
'head-1-1-from_pandas-b71f6a90c7bade4bc0f2c527b857767a'
Out[123]:
{('from_pandas-b71f6a90c7bade4bc0f2c527b857767a', 0):     a   b   c
 0   1   2   3
 1   4   5   6
 2   7   8   9
 3  10  11  12
 4  13  14  15,
 ('head-1-1-from_pandas-b71f6a90c7bade4bc0f2c527b857767a',
  0): (<function dask.dataframe.core.safe_head>, ('from_pandas-b71f6a90c7bade4bc0f2c527b857767a',
   0), 1)}

In [140]:
ddf2._keys()


Out[140]:
[('head-1-1-from_pandas-b71f6a90c7bade4bc0f2c527b857767a', 0)]

In [141]:
ddf2._keys??

In [94]:
ddf = dd.from_pandas(df, chunksize=3)
print(ddf)


Dask DataFrame Structure:
                   a      b      c
npartitions=2                     
0              int64  int64  int64
3                ...    ...    ...
4                ...    ...    ...
Dask Name: from_pandas, 2 tasks

In [96]:
print(ddf._meta)
ddf.npartitions
ddf.divisions
ddf.visualize()


Empty DataFrame
Columns: [a, b, c]
Index: []
Out[96]:
2
Out[96]:
(0, 3, 4)
Out[96]:

In [98]:
ddf.dask


Out[98]:
{('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0):    a  b  c
 0  1  2  3
 1  4  5  6
 2  7  8  9, ('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 1):     a   b   c
 3  10  11  12
 4  13  14  15}

In [99]:
for k, v in ddf.dask.items():
    print(repr(k))
    print('  ' + repr(v))


('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0)
     a  b  c
0  1  2  3
1  4  5  6
2  7  8  9
('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 1)
      a   b   c
3  10  11  12
4  13  14  15

In [103]:
(ddf+1).sum()


Out[103]:
Dask Series Structure:
npartitions=1
a    int64
c      ...
dtype: int64
Dask Name: dataframe-sum-agg, 7 tasks

In [110]:
task = ddf.head(n=2, compute=False)
task.dask
task.visualize()


Out[110]:
{('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0):    a  b  c
 0  1  2  3
 1  4  5  6
 2  7  8  9, ('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 1):     a   b   c
 3  10  11  12
 4  13  14  15, ('head-1-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
  0): (<function dask.dataframe.core.safe_head>, ('from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
   0), 2)}
Out[110]:

In [108]:
ddf.head(2, npartitions=-1, compute=False).visualize()


Out[108]:

In [102]:
(ddf + 1).sum()visualize()


Out[102]:

In [ ]:


In [ ]:


In [ ]:


In [87]:
df.head(n=2)


Out[87]:
a b c
0 1 2 3
1 4 5 6

In [143]:
task = ddf.head(n=2, npartitions=2, compute=False)


---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-143-c680062cadf3> in <module>()
----> 1 task = ddf.head(n=2, npartitions=2, compute=False)

/home/stephen/miniconda3/envs/py36/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
    775         if npartitions > self.npartitions:
    776             msg = "only {} partitions, head received {}"
--> 777             raise ValueError(msg.format(self.npartitions, npartitions))
    778 
    779         name = 'head-%d-%d-%s' % (npartitions, n, self._name)

ValueError: only 1 partitions, head received 2

In [91]:
task.visualize()


Out[91]:

In [92]:
task.dask


Out[92]:
{('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0):    a  b  c
 0  1  2  3
 1  4  5  6
 2  7  8  9, ('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 1):     a   b   c
 3  10  11  12
 4  13  14  15, ('head-2-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
  0): (<function dask.dataframe.core.safe_head>, (<function dask.dataframe.core._concat>,
   [('head-partial-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0),
    ('head-partial-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
     1)]), 2), ('head-partial-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
  0): (<methodcaller: head>,
  ('from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0),
  2), ('head-partial-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
  1): (<methodcaller: head>, ('from_pandas-de36e0f9b56aa90e818c2dda9e03260c',
   1), 2)}

In [142]:
task._keys()


Out[142]:
[('head-1-2-from_pandas-de36e0f9b56aa90e818c2dda9e03260c', 0)]

In [ ]:


In [ ]:


In [ ]:


In [65]:
print(pd.DataFrame.__doc__)


 Two-dimensional size-mutable, potentially heterogeneous tabular data
    structure with labeled axes (rows and columns). Arithmetic operations
    align on both row and column labels. Can be thought of as a dict-like
    container for Series objects. The primary pandas data structure

    Parameters
    ----------
    data : numpy ndarray (structured or homogeneous), dict, or DataFrame
        Dict can contain Series, arrays, constants, or list-like objects
    index : Index or array-like
        Index to use for resulting frame. Will default to np.arange(n) if
        no indexing information part of input data and no index provided
    columns : Index or array-like
        Column labels to use for resulting frame. Will default to
        np.arange(n) if no column labels are provided
    dtype : dtype, default None
        Data type to force, otherwise infer
    copy : boolean, default False
        Copy data from inputs. Only affects DataFrame / 2d ndarray input

    Examples
    --------
    >>> d = {'col1': ts1, 'col2': ts2}
    >>> df = DataFrame(data=d, index=index)
    >>> df2 = DataFrame(np.random.randn(10, 5))
    >>> df3 = DataFrame(np.random.randn(10, 5),
    ...                 columns=['a', 'b', 'c', 'd', 'e'])

    See also
    --------
    DataFrame.from_records : constructor from tuples, also record arrays
    DataFrame.from_dict : from dicts of Series, arrays, or dicts
    DataFrame.from_items : from sequence of (key, value) pairs
    pandas.read_csv, pandas.read_table, pandas.read_clipboard
    

In [66]:
print(dd.DataFrame.__doc__)


    Implements out-of-core DataFrame as a sequence of pandas DataFrames

    Parameters
    ----------

    dask: dict
        The dask graph to compute this DataFrame
    name: str
        The key prefix that specifies which keys in the dask comprise this
        particular DataFrame
    meta: pandas.DataFrame
        An empty ``pandas.DataFrame`` with names, dtypes, and index matching
        the expected output.
    divisions: tuple of index values
        Values along which we partition our blocks on the index
    

In [ ]:
dd.from_pandas()

In [ ]:


In [52]:
ddf = dd.from_pandas(df, chunksize=2)
task = ddf[ddf.a>2]

In [53]:
task.compute()


Out[53]:
a b c
1 4 5 6
2 7 8 9
3 10 11 12

In [54]:
task.visualize()


Out[54]:

In [55]:
print(dd.DataFrame.__doc__)


    Implements out-of-core DataFrame as a sequence of pandas DataFrames

    Parameters
    ----------

    dask: dict
        The dask graph to compute this DataFrame
    name: str
        The key prefix that specifies which keys in the dask comprise this
        particular DataFrame
    meta: pandas.DataFrame
        An empty ``pandas.DataFrame`` with names, dtypes, and index matching
        the expected output.
    divisions: tuple of index values
        Values along which we partition our blocks on the index
    

In [56]:
task._meta


Out[56]:
a b c

In [57]:
task.npartitions
task.divisions


Out[57]:
2
Out[57]:
(0, 2, 3)

In [58]:
task._name


Out[58]:
'getitem-845adeb1f3f6fe7d5b8f1f046ae677ca'

In [59]:
task.dask


Out[59]:
{('from_pandas-37d6c4e12ebefd78272a6470f70a4b21', 0):    a  b  c
 0  1  2  3
 1  4  5  6, ('from_pandas-37d6c4e12ebefd78272a6470f70a4b21', 1):     a   b   c
 2   7   8   9
 3  10  11  12, ('getitem-845adeb1f3f6fe7d5b8f1f046ae677ca',
  0): (<methodcaller: _getitem_array>, ('from_pandas-37d6c4e12ebefd78272a6470f70a4b21',
   0), ('gt-82445471718415f2cbdf08d57f178b4f',
   0)), ('getitem-845adeb1f3f6fe7d5b8f1f046ae677ca',
  1): (<methodcaller: _getitem_array>,
  ('from_pandas-37d6c4e12ebefd78272a6470f70a4b21', 1),
  ('gt-82445471718415f2cbdf08d57f178b4f',
   1)), ('getitem-ed2b560b212841d2cd96769f0eecacf2',
  0): (<function _operator.getitem>,
  ('from_pandas-37d6c4e12ebefd78272a6470f70a4b21', 0),
  'a'), ('getitem-ed2b560b212841d2cd96769f0eecacf2',
  1): (<function _operator.getitem>, ('from_pandas-37d6c4e12ebefd78272a6470f70a4b21',
   1), 'a'), ('gt-82445471718415f2cbdf08d57f178b4f',
  0): (<function dask.compatibility.apply>, <function dask.array.core.partial_by_order>, [('getitem-ed2b560b212841d2cd96769f0eecacf2',
    0)], {'function': <function _operator.gt>,
   'other': [(1, 2)]}), ('gt-82445471718415f2cbdf08d57f178b4f',
  1): (<function dask.compatibility.apply>,
  <function dask.array.core.partial_by_order>,
  [('getitem-ed2b560b212841d2cd96769f0eecacf2', 1)],
  {'function': <function _operator.gt>, 'other': [(1, 2)]})}

In [60]:
task.dask[(task._name,0)]


Out[60]:
(<methodcaller: _getitem_array>,
 ('from_pandas-37d6c4e12ebefd78272a6470f70a4b21', 0),
 ('gt-82445471718415f2cbdf08d57f178b4f', 0))

In [61]:
task.dask[(task._name,1)]


Out[61]:
(<methodcaller: _getitem_array>,
 ('from_pandas-37d6c4e12ebefd78272a6470f70a4b21', 1),
 ('gt-82445471718415f2cbdf08d57f178b4f', 1))

In [62]:
task.compute??

In [48]:
task2.compute()


Out[48]:
a     8
b    10
c    12
dtype: int64

In [49]:
task2.visualize()


Out[49]:

In [51]:
task2.dask[(task2._name,0)]


Out[51]:
(<function dask.compatibility.apply>,
 <function dask.dataframe.core._reduction_aggregate>,
 [(<function dask.dataframe.core._concat>,
   [('dataframe-sum-chunk-4f1ac59b65a08dd36a001a1db6a540cf', 0, 0, 0),
    ('dataframe-sum-chunk-4f1ac59b65a08dd36a001a1db6a540cf', 0, 1, 0)])],
 {'aca_aggregate': <methodcaller: sum>, 'axis': 0, 'skipna': True})

In [ ]: