We shall discuss libraries that are useful when your data is too big to fit in memory, but probably not big enough to justify the added complexity of moving to a cluster. With todays technology, this includes data sets of approximately 10s to 100s of gigabytes in size.
One goal is to introduce the storage library h5py
and the pydata packages odo
, dask
and blaze
.
Packages to install
pip install dask
pip install cloudpickle
pip install graphviz
pip install memory_profiler
In [2]:
import time
import string
import h5py
import dask
import dask.bag as db
import dask.dataframe as dd
import dask.array as da
from dask.dot import dot_graph
from dask.imperative import do, value
from odo import odo, drop, discover
from blaze import dshape, Data, by
In [71]:
def save(i, clobber=False):
fn = 'x%06d.npy' % i
if clobber or not os.path.exists(fn):
x = np.random.random((1000,1000))
np.save(fn, x)
return i
In [72]:
def save_csv(i, clobber=False):
fn = 'x%06d.csv' % i
if clobber or not os.path.exists(fn):
x = np.random.random((1000,1000))
np.savetxt(fn, x, delimiter=',', fmt='%d')
return i
In [73]:
%timeit save(0, clobber=True)
In [74]:
%timeit save_csv(0, clobber=True)
In [75]:
from concurrent.futures import ProcessPoolExecutor
In [76]:
with ProcessPoolExecutor() as pool:
pool.map(save, range(1000))
In [77]:
%load_ext memory_profiler
In [78]:
%memit x = np.load('x%06d.npy' % 0)
In [79]:
%memit x = np.loadtxt('x%06d.csv' % 0, delimiter=',')
In [80]:
def process_one(f):
x = np.load(f)
return x.mean()
In [81]:
n = 100
start = time.time()
xs =[process_one('x%06d.npy' % i) for i in range(n)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
Using savez
and savez_compressed
In [124]:
n = 100
np.savez('xs.npz', *(np.random.random((1000,1000))
for i in range(n)))
In [125]:
xs = np.load('xs.npz')
In [126]:
xs.keys()[:3]
Out[126]:
In [127]:
xs['arr_0'].mean()
Out[127]:
In [128]:
n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)
if not os.path.exists(filename):
fp = np.memmap(filename, dtype='float64', mode='w+',
shape=shape) # create memmap
for i in range(n):
x = np.load('x%06d.npy' % i)
fp[i] = x
del fp # Deletion flushes to disk before removing the object
In [129]:
fp = np.memmap(filename, dtype='float64', mode='r', shape=shape) # get handle to memmap
In [130]:
n = 100
start = time.time()
xs = [fp[i].mean() for i in range(n)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
HDF5 is a hierarchical file format that allows selective disk reads, but also provides a tree structure for organizing your data sets. It also does not have the size limitation of memmap
and can include metadata annotation for documentation. Because of its flexibility, you should seriously consider using HDF5 for your data storage needs.
I suggest using the python package h5py
for working wiht HDF5 files. See documentation.
In [131]:
import datetime
In [133]:
%%time
n = 100
if not os.path.exists('random.hdf5'):
with h5py.File('random.hdf5', 'w') as f:
for i in range(n):
x = np.load('x%06d.npy' % i)
dset = f.create_dataset('x%06d' % i, shape=x.shape)
dset[:] = x
dset.attrs['created'] = str(datetime.datetime.now())
In [134]:
with h5py.File('random.hdf5', 'r') as f:
f.visit(lambda x: print(x, f[x].shape, f[x].attrs['created']))
In [135]:
n = 100
start = time.time()
with h5py.File('random.hdf5', 'r') as f:
xs = [np.mean(f['x%06d' % i]) for i in range(n)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
From the official documentation,
Dask is a simple task scheduling system that uses directed acyclic graphs (DAGs) of tasks to break up large computations into many small ones.
Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.
On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.
The model for how Dask works is quite similar to Spark, and we will see the same features
numpy
arrays, dicts
and pandas
dataframes
In [136]:
n = 100
start = time.time()
with h5py.File('random.hdf5', 'r') as f:
xs = [da.from_array(f['x%06d' % i], chunks=(1000,1000)) for i in range(n)]
xs = da.concatenate(xs)
avg = xs.mean().compute()
elapsed = time.time() - start
print(avg, 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
In [137]:
b = db.from_filenames('data/wiki/AA/*')
In [138]:
start = time.time()
words = b.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute()
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
In [139]:
start = time.time()
words = b.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute(get = dask.async.get_sync)
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
In [140]:
freqs = (b.str.translate({ord(char): None for char in string.punctuation})
.str.lower()
.str.split()
.concat()
.frequencies())
In [141]:
freqs.take(5)
Out[141]:
In [142]:
freqs.topk(5, key=lambda x: x[1]).compute()
Out[142]:
In [143]:
dot_graph(freqs.dask)
Out[143]:
In [45]:
start = time.time()
df = dd.read_csv('x00000*.csv', header=None)
print(df.describe().compute())
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
In [144]:
df_freqs = freqs.to_dataframe(columns=['word', 'n'])
df_freqs.head(10)
Out[144]:
Sometimes you need to run custom functions that don't fit into the array, bag or dataframe abstractions. Dask provides the imperative
module for this purpose with two decorators: do
that wraps a function and value
that wraps classes. Apart from decorators and the need to call compute
for evaluation, you just write regular Python code - yet it can take advantage of the Dask scheduling machinery. Note that the for
loop simply builds up a graph of necessary computations - no computation is actually done until compute is called.
In [145]:
@do
def load(filename):
with open(filename) as f:
return f.read()
@do
def clean(data):
return (data
.translate({ord(char): None for char in string.punctuation})
.lower()
)
@do
def analyze(sequence_of_data):
wc = {}
for data in sequence_of_data:
words = data.split()
for word in words:
wc[word] = wc.get(word, 0) + 1
return wc
@do
def top_k(counts, k, **kwargs):
return sorted(counts.items(), reverse = True, **kwargs)[:k]
In [146]:
files = glob.glob('/Volumes/HD4/data/wiki/extracted/AA/*')[:3]
loaded = [load(i) for i in files]
cleaned = [clean(i) for i in loaded]
analyzed = analyze(cleaned)
top5 = top_k(analyzed, 5)
top5.compute()
Out[146]:
In [147]:
top_k(analyzed, 5, key=lambda x: x[1]).compute()
Out[147]:
Blaze also works on heterogeneous data sets, and provides a high-level consistent interface for working with data from mulitple sources. Under the hood, blaze
may make use of odo
, dask
and pandas
. Using blaze
is very similar to usage pandas
. See official documentation.
See description at http://seanlahman.com/files/database/readme58.txt
In [148]:
import urllib.request
url = 'https://github.com/jknecht/baseball-archive-sqlite/raw/master/lahman2013.sqlite'
file_name = 'lahman2013.sqlite'
urllib.request.urlretrieve(url, file_name)
Out[148]:
In [150]:
db = Data('sqlite:///lahman2013.sqlite')
In [151]:
db.fields
Out[151]:
In [154]:
db.Master.head(n=3)
Out[154]:
In [155]:
master = db.Master
birth = by(master.birthCountry, n=master.birthCountry.count())
birth.sort('n', ascending=False).head(5)
Out[155]:
In [ ]: