In [1]:
%matplotlib inline
import time
import datetime
import matplotlib.pyplot as plt
import numpy as np

Working with large data sets

Small-scale distributed programming

Using dask

For data sets that are not too big (say up to 1 TB), it is typically sufficient to process on a single workstation. The package dask provides 3 data structures that mimic regular Python data structures but perform computation in a distributed way allowing you to make optimal use of multiple cores easily.

These structures are

  • dask array ~ numpy array
  • dask bag ~ Python dictionary
  • dask dataframe ~ pandas dataframe

From the official documentation,

Dask is a simple task scheduling system that uses directed acyclic graphs (DAGs) of tasks to break up large computations into many small ones.

Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write    complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.

On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.

For interesting examples of dask in practice, see Matthew Rocklin's blog.


In [2]:
! pip install dask


Requirement already satisfied: dask in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages

In [3]:
import dask
import dask.array as da
import dask.bag as db
import dask.dataframe as dd
from dask import delayed

dask arrays

These behave like numpy arrays, but break a massive job into tasks that are then executed by a scheduler. The default scheduler uses threading but you can also use multiprocessing or distributed or even serial processing (mainly for debugging). You can tell the dask array how to break the data into chunks for processing.

From official documents

For performance, a good choice of chunks follows the following rules:

A chunk should be small enough to fit comfortably in memory. We’ll have many chunks in memory at once.
A chunk must be large enough so that computations on that chunk take significantly longer than the 1ms overhead per task that dask scheduling incurs. A task should take longer than 100ms.
Chunks should align with the computation that you want to do. For example if you plan to frequently slice along a particular dimension then it’s more efficient if your chunks are aligned so that you have to touch fewer chunks. If you want to add two arrays then its convenient if those arrays have matching chunks patterns.

In [4]:
# We resuse the 100 * 1000 * 1000 random numbers in the memmap file on disk
n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)
fp = np.memmap(filename, dtype='float64', mode='r', shape=shape)

# We can decide on the chunk size to be distributed for computing
xs = [da.from_array(fp[i], chunks=(200,500)) for i in range(n)]
xs = da.concatenate(xs)
avg = xs.mean().compute()


---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
<ipython-input-4-5adc8ca9ee0a> in <module>()
      3 filename = 'random.dat'
      4 shape = (n, 1000, 1000)
----> 5 fp = np.memmap(filename, dtype='float64', mode='r', shape=shape)
      6 
      7 # We can decide on the chunk size to be distributed for computing

/Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages/numpy/core/memmap.py in __new__(subtype, filename, dtype, mode, offset, shape, order)
    219             own_file = True
    220         else:
--> 221             fid = open(filename, (mode == 'c' and 'r' or mode)+'b')
    222             own_file = True
    223 

FileNotFoundError: [Errno 2] No such file or directory: 'random.dat'

In [ ]:
avg

In [ ]:
# Typically we store Dask arrays inot HDF5

da.to_hdf5('data/xs.hdf5', '/foo/xs', xs)

In [ ]:
with h5py.File('data/xs.hdf5', 'r') as f:
    print(f.get('/foo/xs').shape)

dask data frames

Dask dataframes can treat multiple pandas dataframes that might not simultaneously fit into memory like a single dataframe. See use of globbing to specify multiple source files.


In [ ]:
for i in range(5):
    f = 'data/x%03d.csv' % i
    np.savetxt(f, np.random.random((1000, 5)), delimiter=',')

In [ ]:
df = dd.read_csv('data/x*.csv', header=None)
print(df.describe().compute())

dask bags

Dask bags work like multisets for unstructured or semi-structured data sets, typically over many files. A multiset is a set that allows repeats. Unlike lists, order is not preserved.

The dask bag is often used for preprocessing data before conversion to the more efficient array or dataframe collections. Manipulating dask bags has a functional flavor, similar to using toolz for standard Python collections.

Creating a bag


In [ ]:
bag = db.from_sequence(np.random.randint(0,4, 10))
bag.frequencies().compute()

The AA subdirectory consists of 101 1 MB plain text files from the English Wikipedia


In [ ]:
text = db.read_text('data/wiki/AA/*')

In [ ]:
%%time

words = text.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute()

In [ ]:
print(top10)

This is slow because of disk access. Fix by changing scheduler to work asynchronously.


In [ ]:
%%time

words = text.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute(get = dask.async.get_sync)

In [ ]:
print(top10)

Conversion from bag to dataframe


In [ ]:
import string

In [ ]:
freqs = (text.
         str.translate({ord(char): None for char in string.punctuation}).
         str.lower().
         str.split().
         concat().
         frequencies())
Get the top 5 words sorted by key (not value)

In [ ]:
freqs.topk(5).compute(get = dask.async.get_sync)

In [ ]:
df_freqs = freqs.to_dataframe(columns=['word', 'n'])
df_freqs.head(n=5)

The compute method converts to a regular pandas dataframe

For data sets that fit in memory, pandas is faster and allows some operations like sorting that are not provided by dask dataframes.


In [ ]:
df = df_freqs.compute()

In [ ]:
df.sort_values('word', ascending=False).head(5)

dask delayed

For full custom pipelines, you can use the delayed function. This just wraps standard Python functions so that they are not evaluated until called upon to do so by the scheduler. You can think of delayed as converting an eager function to a lazy one. You generally used delayed when the processing task is not easily doable with any of the array, bag or data frame abstractions, since you have the full power of Python with delayed.

It is easy to convert to and from delayed with the array, bag or data frame parallel data structures using the to_delayed() and from_delayted() methods.

We will show the simple example provided in the dask documentation.

def inc(x): return x + 1

def double(x): return x + 2

def add(x, y): return x + y

data = [1, 2, 3, 4, 5]

output = [] for x in data: a = delayed(inc)(x) b = delayed(double)(x) c = delayed(add)(a, b) output.append(c)

total = delayed(sum)(output)


In [ ]:
total.compute()

The DAG (directed acyclic graph) of tasks built by dask

Using this graph, the scheduler can identify opportunities for parallelism.


In [ ]:
total.visualize()

Minimizing computation of intermediate objects


In [ ]:
x = da.random.randint(0, 5, 10, chunks=(5,))
y = (x + 1).sum()
z = (x + 1).mean()
da.compute(y, z)

Using the web interface

See docs


In [ ]:
! pip install distributed

In [ ]: