In [1]:
%matplotlib inline
import time
import datetime
import matplotlib.pyplot as plt
import numpy as np
dask
For data sets that are not too big (say up to 1 TB), it is typically sufficient to process on a single workstation. The package dask provides 3 data structures that mimic regular Python data structures but perform computation in a distributed way allowing you to make optimal use of multiple cores easily.
These structures are
From the official documentation,
Dask is a simple task scheduling system that uses directed acyclic graphs (DAGs) of tasks to break up large computations into many small ones.
Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.
On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.
For interesting examples of dask
in practice, see Matthew Rocklin's blog.
In [2]:
! pip install dask
In [3]:
import dask
import dask.array as da
import dask.bag as db
import dask.dataframe as dd
from dask import delayed
dask
arraysThese behave like numpy
arrays, but break a massive job into tasks that are then executed by a scheduler. The default scheduler uses threading but you can also use multiprocessing or distributed or even serial processing (mainly for debugging). You can tell the dask array how to break the data into chunks for processing.
From official documents
For performance, a good choice of chunks follows the following rules:
A chunk should be small enough to fit comfortably in memory. We’ll have many chunks in memory at once.
A chunk must be large enough so that computations on that chunk take significantly longer than the 1ms overhead per task that dask scheduling incurs. A task should take longer than 100ms.
Chunks should align with the computation that you want to do. For example if you plan to frequently slice along a particular dimension then it’s more efficient if your chunks are aligned so that you have to touch fewer chunks. If you want to add two arrays then its convenient if those arrays have matching chunks patterns.
In [4]:
# We resuse the 100 * 1000 * 1000 random numbers in the memmap file on disk
n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)
fp = np.memmap(filename, dtype='float64', mode='r', shape=shape)
# We can decide on the chunk size to be distributed for computing
xs = [da.from_array(fp[i], chunks=(200,500)) for i in range(n)]
xs = da.concatenate(xs)
avg = xs.mean().compute()
In [ ]:
avg
In [ ]:
# Typically we store Dask arrays inot HDF5
da.to_hdf5('data/xs.hdf5', '/foo/xs', xs)
In [ ]:
with h5py.File('data/xs.hdf5', 'r') as f:
print(f.get('/foo/xs').shape)
In [ ]:
for i in range(5):
f = 'data/x%03d.csv' % i
np.savetxt(f, np.random.random((1000, 5)), delimiter=',')
In [ ]:
df = dd.read_csv('data/x*.csv', header=None)
print(df.describe().compute())
dask
bagsDask bags work like multisets for unstructured or semi-structured data sets, typically over many files. A multiset is a set that allows repeats. Unlike lists, order is not preserved.
The dask bag is often used for preprocessing data before conversion to the more efficient array or dataframe collections. Manipulating dask bags has a functional flavor, similar to using toolz
for standard Python collections.
In [ ]:
bag = db.from_sequence(np.random.randint(0,4, 10))
bag.frequencies().compute()
In [ ]:
text = db.read_text('data/wiki/AA/*')
In [ ]:
%%time
words = text.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute()
In [ ]:
print(top10)
This is slow because of disk access. Fix by changing scheduler to work asynchronously.
In [ ]:
%%time
words = text.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute(get = dask.async.get_sync)
In [ ]:
print(top10)
In [ ]:
import string
In [ ]:
freqs = (text.
str.translate({ord(char): None for char in string.punctuation}).
str.lower().
str.split().
concat().
frequencies())
In [ ]:
freqs.topk(5).compute(get = dask.async.get_sync)
In [ ]:
df_freqs = freqs.to_dataframe(columns=['word', 'n'])
df_freqs.head(n=5)
In [ ]:
df = df_freqs.compute()
In [ ]:
df.sort_values('word', ascending=False).head(5)
For full custom pipelines, you can use the delayed function. This just wraps standard Python functions so that they are not evaluated until called upon to do so by the scheduler. You can think of delayed as converting an eager function to a lazy one. You generally used delayed when the processing task is not easily doable with any of the array, bag or data frame abstractions, since you have the full power of Python with delayed.
It is easy to convert to and from delayed with the array, bag or data frame parallel data structures using the to_delayed()
and from_delayted()
methods.
We will show the simple example provided in the dask
documentation.
def inc(x): return x + 1
def double(x): return x + 2
def add(x, y): return x + y
data = [1, 2, 3, 4, 5]
output = [] for x in data: a = delayed(inc)(x) b = delayed(double)(x) c = delayed(add)(a, b) output.append(c)
total = delayed(sum)(output)
In [ ]:
total.compute()
In [ ]:
total.visualize()
In [ ]:
x = da.random.randint(0, 5, 10, chunks=(5,))
y = (x + 1).sum()
z = (x + 1).mean()
da.compute(y, z)
See docs
In [ ]:
! pip install distributed
In [ ]: