In [1]:
%matplotlib inline
import time
import datetime
import matplotlib.pyplot as plt
import numpy as np
A list comprehension is eager.
In [2]:
[x*x for x in range(3)]
Out[2]:
A generator expression is lazy.
In [3]:
(x*x for x in range(3))
Out[3]:
You can use generators as iterators.
In [4]:
g = (x*x for x in range(3))
In [5]:
next(g)
Out[5]:
In [6]:
next(g)
Out[6]:
In [7]:
next(g)
Out[7]:
In [8]:
next(g)
A generator is single use.
In [9]:
for i in g:
print(i, end=", ")
In [10]:
g = (x*x for x in range(3))
for i in g:
print(i, end=", ")
The list constructor forces evaluation of the generator.
In [11]:
list(x*x for x in range(3))
Out[11]:
An eager function.
In [12]:
def eager_updown(n):
xs = []
for i in range(n):
xs.append(i)
for i in range(n, -1, -1):
xs.append(i)
return xs
In [13]:
eager_updown(3)
Out[13]:
A lazy generator.
In [14]:
def lazy_updown(n):
for i in range(n):
yield i
for i in range(n, -1, -1):
yield i
In [15]:
lazy_updown(3)
Out[15]:
In [16]:
list(lazy_updown(3))
Out[16]:
A pure function is like a mathematical function. Given the same inputs, it always returns the same output, and has no side effects.
In [17]:
def pure(alist):
return [x*x for x in alist]
An impure function has side effects.
In [18]:
def impure(alist):
for i in range(len(alist)):
alist[i] = alist[i]*alist[i]
return alist
In [19]:
xs = [1,2,3]
In [20]:
ys = pure(xs)
print(xs, ys)
In [21]:
ys = impure(xs)
print(xs, ys)
In [22]:
def f1(n):
return n//2 if n % 2==0 else n*3+1
In [23]:
def f2(n):
return np.random.random(n)
In [24]:
def f3(n):
n = 23
return n
In [25]:
def f4(a, n=[]):
n.append(a)
return n
In [26]:
list(map(f1, range(10)))
Out[26]:
In [27]:
list(filter(lambda x: x % 2 == 0, range(10)))
Out[27]:
In [28]:
from functools import reduce
In [29]:
reduce(lambda x, y: x + y, range(10), 0)
Out[29]:
In [30]:
reduce(lambda x, y: x + y, [[1,2], [3,4], [5,6]], [])
Out[30]:
In [31]:
import operator as op
In [32]:
reduce(op.mul, range(1, 6), 1)
Out[32]:
In [33]:
list(map(op.itemgetter(1), [[1,2,3],[4,5,6],[7,8,9]]))
Out[33]:
In [34]:
import itertools as it
In [35]:
list(it.combinations(range(1,6), 3))
Out[35]:
Generate all Boolean combinations
In [36]:
list(it.product([0,1], repeat=3))
Out[36]:
In [37]:
list(it.starmap(op.add, zip(range(5), range(5))))
Out[37]:
In [38]:
list(it.takewhile(lambda x: x < 3, range(10)))
Out[38]:
In [39]:
data = sorted('the quick brown fox jumps over the lazy dog'.split(), key=len)
for k, g in it.groupby(data, key=len):
print(k, list(g))
In [40]:
! pip install toolz
In [41]:
import toolz as tz
In [42]:
list(tz.partition(3, range(10)))
Out[42]:
In [43]:
list(tz.partition(3, range(10), pad=None))
Out[43]:
In [44]:
n = 30
dna = ''.join(np.random.choice(list('ACTG'), n))
dna
Out[44]:
In [45]:
tz.frequencies(tz.sliding_window(2, dna))
Out[45]:
In [46]:
from toolz import curried as c
In [47]:
tz.pipe(
dna,
c.sliding_window(2), # using curry
c.frequencies,
)
Out[47]:
In [48]:
composed = tz.compose(
c.frequencies,
c.sliding_window(2),
)
In [49]:
composed(dna)
Out[49]:
In [50]:
m = 10000
n = 300
dnas = (''.join(np.random.choice(list('ACTG'), n, p=[.1, .2, .3, .4]))
for i in range(m))
dnas
Out[50]:
In [51]:
tz.merge_with(sum,
tz.map(
composed,
dnas
)
)
Out[51]:
memmap
You can selectively retrieve parts of numpy
arrays stored on disk into memory for processing with memmap
.
Memory-mapped files are used for accessing small segments of large files on disk, without reading the entire file into memory. The numpy.memmap
can be used anywhere an ndarray
is used. The maximum size of a memmap
array is limited by what the operating system allows - in particular, this is different on 32 and 64 bit architectures.
In [52]:
n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)
# create memmap
fp = np.memmap(filename, dtype='float64', mode='w+', shape=shape)
# store some data in it
for i in range(n):
x = np.random.random(shape[1:])
fp[i] = x
# flush to disk and remove file handler
del fp
In [53]:
fp = np.memmap(filename, dtype='float64', mode='r', shape=shape)
# only one block is retreived into memory at a time
start = time.time()
xs = [fp[i].mean() for i in range(n)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
HDF5 is a hierarchical file format that allows selective disk reads, but also provides a tree structure for organizing your data sets. It can also include metadata annotation for documentation. Because of its flexibility, you should seriously consider using HDF5 for your data storage needs.
I suggest using the python package h5py
for working with HDF5 files. See documentation.
In [54]:
import h5py
In [55]:
%%time
n = 5
filename = 'random.hdf5'
shape = (n, 1000, 1000)
groups = ['Sim%02d' % i for i in range(5)]
with h5py.File(filename, 'w') as f:
# Create hierarchical group structure
for group in groups:
g = f.create_group(group)
# Add metadata for each group
g.attrs['created'] = str(datetime.datetime.now())
# Save 100 arrays in each group
for i in range(n):
x = np.random.random(shape[1:])
dset = g.create_dataset('x%06d' % i, shape=x.shape)
dset[:] = x
# Add metadata for each array
dset.attrs['created'] = str(datetime.datetime.now())
In [56]:
f = h5py.File('random.hdf5', 'r')
The HDF5 objects can be treated like dictionaries.
In [57]:
for name in f:
print(name)
In [58]:
for key in f.keys():
print(key)
In [59]:
sim1 = f.get('Sim01')
In [60]:
list(sim1.keys())[:5]
Out[60]:
Or recursed through like trees
In [61]:
f.visit(lambda x: print(x))
Retrieving data and attributes
In [62]:
sim2 = f.get('Sim02')
sim2.attrs['created']
Out[62]:
In [63]:
x = sim2.get('x000003')
print(x.shape)
print(x.dtype)
print(list(x.attrs.keys()))
print(x.attrs['created'])
np.mean(x)
Out[63]:
In [64]:
f.close()
When data is on a relational database, it is useful to do as much preprocessing as possible using SQL - this will be performed using highly efficient compiled routines on the (potentially remote) computer where the database exists.
Here we will use SQLite3 together with pandas
to summarize a (potentially) large database.
In [65]:
import pandas as pd
from sqlalchemy import create_engine
In [66]:
engine = create_engine('sqlite:///data/movies.db')
In [67]:
q = '''
SELECT year, count(*) as number
FROM data
GROUP BY year
ORDER BY number DESC
'''
# The coounting, grouping and sorting is done by the database, not pandas
# So this query will work even if the movies dataabse is many terabytes in size
df = pd.read_sql_query(q, engine)
df.head()
Out[67]:
There is a convenient Python package called odo
that will convert data between different formats without having to load all the data into memory first. This allows conversion of potentially huge files.
Odo is a shape shifting character in the Star Trek universe.
In [68]:
! pip install odo
In [69]:
import odo
In [70]:
odo.odo('sqlite:///data/movies.db::data', 'data/movies.csv')
Out[70]:
In [71]:
! head data/movies.csv
A data sketch
is a probabilistic algorithm or data structure that approximates some statistic of interest, typically using very little memory and processing time. Often they are applied to streaming data, and so must be able to incrementally process data. Many data sketches make use of hash functions to distribute data into buckets uniformly. Typically, data sketches have the following desirable properties
Examples where counting distinct values is useful:
Packages for data sketches in Python are relatively immature, and if you are interested, you could make a large contribution by creating a comprehensive open source library of data sketches in Python.
Counting the number of distinct elements exactly requires storage of all distinct elements (e.g. in a set) and hence grows with the cardinality $n$. Probabilistic data structures known as Distinct Value Sketches can do this with a tiny and fixed memory size.
A hash function takes data of arbitrary size and converts it into a number in a fixed range. Ideally, given an arbitrary set of data items, the hash function generates numbers that follow a uniform distribution within the fixed range. Hash functions are immensely useful throughout computer science (for example - they power Python sets and dictionaries), and especially for the generation of probabilistic data structures.
The binary digits in a (say) 32-bit hash are effectively random, and equivalent to a sequence of fair coin tosses. Hence the probability that we see a run of 5 zeros in the smallest hash so far suggests that we have added $2^5$ unique items so far. This is the intuition behind the loglog family of Distinct Value Sketches. Note that the biggest count we can track with 32 bits is $2^{32} = 4294967296$.
The accuracy of the sketch can be improved by averaging results with multiple coin flippers. In practice, this is done by using the first $k$ bit registers to identify $2^k$ different coin flippers. Hence, the max count is now $2 ** (32 - k)$. The hyperloglog algorithm uses the harmonic mean of the $2^k$ flippers which reduces the effect of outliers and hence the variance of the estimate.
In [72]:
! pip install hyperloglog
In [73]:
from hyperloglog import HyperLogLog
In [74]:
def flatten(xs):
return (x for sublist in xs for x in sublist)
def error(a, b, n):
return abs(len(a) - len(b))/n
print('True\t\tHLL\t\tRel Error')
with open('data/Ulysses.txt') as f:
word_list = flatten(line.split() for line in f)
s = set([])
hll = HyperLogLog(error_rate=0.01)
for i, word in enumerate(word_list):
s.add(word)
hll.add(word)
if i%int(.2e5)==0:
print('%8d\t%8d\t\t%.3f' %
(len(s), len(hll),
0 if i==0 else error(s, hll, i)))
Bloom filters are designed to answer queries about whether a specific item is in a collection. If the answer is NO, then it is definitive. However, if the answer is yes, it might be a false positive. The possibility of a false positive makes the Bloom filter a probabilistic data structure.
A bloom filter consists of a bit vector of length $k$ initially set to zero, and $n$ different hash functions that return a hash value that will fall into one of the $k$ bins. In the construction phase, for every item in the collection, $n$ hash values are generated by the $n$ hash functions, and every position indicated by a hash value is flipped to one. In the query phase, given an item, $n$ hash values are calculated as before - if any of these $n$ positions is a zero, then the item is definitely not in the collection. However, because of the possibility of hash collisions, even if all the positions are one, this could be a false positive. Clearly, the rate of false positives depends on the ratio of zero and one bits, and there are Bloom filter implementations that will dynamically bound the ratio and hence the false positive rate.
Possible uses of a Bloom filter include:
In [75]:
! pip install git+https://github.com/jaybaird/python-bloomfilter.git
In [76]:
from pybloom import ScalableBloomFilter
In [77]:
# The Scalable Bloom Filter grows as needed to keep the error rate small
sbf = ScalableBloomFilter(error_rate=0.001)
In [78]:
with open('data/Ulysses.txt') as f:
word_set = set(flatten(line.split() for line in f))
for word in word_set:
sbf.add(word)
In [79]:
test_words = ['banana', 'artist', 'Dublin', 'masochist', 'Obama']
In [80]:
for word in test_words:
print(word, word in sbf)
In [81]:
for word in test_words:
print(word, word in word_set)
dask
For data sets that are not too big (say up to 1 TB), it is typically sufficient to process on a single workstation. The package dask provides 3 data structures that mimic regular Python data structures but perform computation in a distributed way allowing you to make optimal use of multiple cores easily.
These structures are
From the official documentation,
Dask is a simple task scheduling system that uses directed acyclic graphs (DAGs) of tasks to break up large computations into many small ones.
Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.
On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.
In [82]:
! pip install dask
In [83]:
import dask
import dask.array as da
import dask.bag as db
import dask.dataframe as dd
dask
arraysThese behave like numpy
arrays, but break a massive job into tasks that are then executed by a scheduler. The default scheduler uses threading but you can also use multiprocessing or distributed or even serial processing (mainly for debugging). You can tell the dask array how to break the data into chunks for processing.
From official documents
For performance, a good choice of chunks follows the following rules:
A chunk should be small enough to fit comfortably in memory. We’ll have many chunks in memory at once.
A chunk must be large enough so that computations on that chunk take significantly longer than the 1ms overhead per task that dask scheduling incurs. A task should take longer than 100ms.
Chunks should align with the computation that you want to do. For example if you plan to frequently slice along a particular dimension then it’s more efficient if your chunks are aligned so that you have to touch fewer chunks. If you want to add two arrays then its convenient if those arrays have matching chunks patterns.
In [84]:
# We resuse the 100 * 1000 * 1000 random numbers in the memmap file on disk
n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)
fp = np.memmap(filename, dtype='float64', mode='r', shape=shape)
# We can decide on the chunk size to be distributed for computing
xs = [da.from_array(fp[i], chunks=(200,500)) for i in range(n)]
xs = da.concatenate(xs)
avg = xs.mean().compute()
In [85]:
avg
Out[85]:
In [86]:
# Typically we store Dask arrays inot HDF5
da.to_hdf5('data/xs.hdf5', '/foo/xs', xs)
In [87]:
with h5py.File('data/xs.hdf5', 'r') as f:
print(f.get('/foo/xs').shape)
In [88]:
for i in range(5):
f = 'data/x%03d.csv' % i
np.savetxt(f, np.random.random((1000, 5)), delimiter=',')
In [89]:
df = dd.read_csv('data/x*.csv', header=None)
print(df.describe().compute())
In [90]:
text = db.read_text('data/wiki/AA/*')
In [91]:
%%time
words = text.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute()
In [92]:
print(top10)
This is slow because of disk access. Fix by changing scheduler to work asynchronously.
In [93]:
%%time
words = text.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute(get = dask.async.get_sync)
In [94]:
print(top10)
In [95]:
import string
In [96]:
freqs = (text.
str.translate({ord(char): None for char in string.punctuation}).
str.lower().
str.split().
concat().
frequencies())
In [97]:
freqs.topk(5).compute(get = dask.async.get_sync)
Out[97]:
In [98]:
df_freqs = freqs.to_dataframe(columns=['word', 'n'])
df_freqs.head(n=5)
Out[98]:
In [99]:
df = df_freqs.compute()
In [100]:
df.sort_values('word', ascending=False).head(5)
Out[100]: