In [1]:
%matplotlib inline
import time
import datetime
import matplotlib.pyplot as plt
import numpy as np

Working with large data sets

Lazy evaluation, pure functions and higher order functions

Lazy and eager evaluation

A list comprehension is eager.


In [2]:
[x*x for x in range(3)]


Out[2]:
[0, 1, 4]

A generator expression is lazy.


In [3]:
(x*x for x in range(3))


Out[3]:
<generator object <genexpr> at 0x11d3fca40>

You can use generators as iterators.


In [4]:
g = (x*x for x in range(3))

In [5]:
next(g)


Out[5]:
0

In [6]:
next(g)


Out[6]:
1

In [7]:
next(g)


Out[7]:
4

In [8]:
next(g)


---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-8-5f315c5de15b> in <module>()
----> 1 next(g)

StopIteration: 

A generator is single use.


In [9]:
for i in g:
    print(i, end=", ")

In [10]:
g = (x*x for x in range(3))
for i in g:
    print(i, end=", ")


0, 1, 4, 

The list constructor forces evaluation of the generator.


In [11]:
list(x*x for x in range(3))


Out[11]:
[0, 1, 4]

An eager function.


In [12]:
def eager_updown(n):
    xs = []
    for i in range(n):
        xs.append(i)
    for i in range(n, -1, -1):
        xs.append(i)
    return xs

In [13]:
eager_updown(3)


Out[13]:
[0, 1, 2, 3, 2, 1, 0]

A lazy generator.


In [14]:
def lazy_updown(n):
    for i in range(n):
        yield i
    for i in range(n, -1, -1):
        yield i

In [15]:
lazy_updown(3)


Out[15]:
<generator object lazy_updown at 0x11d6bd990>

In [16]:
list(lazy_updown(3))


Out[16]:
[0, 1, 2, 3, 2, 1, 0]

Pure and impure functions

A pure function is like a mathematical function. Given the same inputs, it always returns the same output, and has no side effects.


In [17]:
def pure(alist):
    return [x*x for x in alist]

An impure function has side effects.


In [18]:
def impure(alist):
    for i in range(len(alist)):
        alist[i] = alist[i]*alist[i]
    return alist

In [19]:
xs = [1,2,3]

In [20]:
ys = pure(xs)
print(xs, ys)


[1, 2, 3] [1, 4, 9]

In [21]:
ys = impure(xs)
print(xs, ys)


[1, 4, 9] [1, 4, 9]

Quiz

Say if the following functions are pure or impure.


In [22]:
def f1(n):
    return n//2 if n % 2==0 else n*3+1

In [23]:
def f2(n):
    return np.random.random(n)

In [24]:
def f3(n):
    n = 23
    return n

In [25]:
def f4(a, n=[]):
    n.append(a)
    return n

Higher order functions


In [26]:
list(map(f1, range(10)))


Out[26]:
[0, 4, 1, 10, 2, 16, 3, 22, 4, 28]

In [27]:
list(filter(lambda x: x % 2 == 0, range(10)))


Out[27]:
[0, 2, 4, 6, 8]

In [28]:
from functools import reduce

In [29]:
reduce(lambda x, y: x + y, range(10), 0)


Out[29]:
45

In [30]:
reduce(lambda x, y: x + y, [[1,2], [3,4], [5,6]], [])


Out[30]:
[1, 2, 3, 4, 5, 6]

Using the operator module

The operator module provides all the Python operators as functions.


In [31]:
import operator as op

In [32]:
reduce(op.mul, range(1, 6), 1)


Out[32]:
120

In [33]:
list(map(op.itemgetter(1), [[1,2,3],[4,5,6],[7,8,9]]))


Out[33]:
[2, 5, 8]

Using itertools


In [34]:
import itertools as it

In [35]:
list(it.combinations(range(1,6), 3))


Out[35]:
[(1, 2, 3),
 (1, 2, 4),
 (1, 2, 5),
 (1, 3, 4),
 (1, 3, 5),
 (1, 4, 5),
 (2, 3, 4),
 (2, 3, 5),
 (2, 4, 5),
 (3, 4, 5)]

Generate all Boolean combinations


In [36]:
list(it.product([0,1], repeat=3))


Out[36]:
[(0, 0, 0),
 (0, 0, 1),
 (0, 1, 0),
 (0, 1, 1),
 (1, 0, 0),
 (1, 0, 1),
 (1, 1, 0),
 (1, 1, 1)]

In [37]:
list(it.starmap(op.add, zip(range(5), range(5))))


Out[37]:
[0, 2, 4, 6, 8]

In [38]:
list(it.takewhile(lambda x: x < 3, range(10)))


Out[38]:
[0, 1, 2]

In [39]:
data = sorted('the quick brown fox jumps over the lazy dog'.split(), key=len)
for k, g in it.groupby(data, key=len):
    print(k, list(g))


3 ['the', 'fox', 'the', 'dog']
4 ['over', 'lazy']
5 ['quick', 'brown', 'jumps']

Using toolz


In [40]:
! pip install toolz


Requirement already satisfied: toolz in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages

In [41]:
import toolz as tz

In [42]:
list(tz.partition(3, range(10)))


Out[42]:
[(0, 1, 2), (3, 4, 5), (6, 7, 8)]

In [43]:
list(tz.partition(3, range(10), pad=None))


Out[43]:
[(0, 1, 2), (3, 4, 5), (6, 7, 8), (9, None, None)]

In [44]:
n = 30
dna = ''.join(np.random.choice(list('ACTG'), n))
dna


Out[44]:
'TGCGTACTTTTCGCTATCCTCTAGTAGTTG'

In [45]:
tz.frequencies(tz.sliding_window(2, dna))


Out[45]:
{('A', 'C'): 1,
 ('A', 'G'): 2,
 ('A', 'T'): 1,
 ('C', 'C'): 1,
 ('C', 'G'): 2,
 ('C', 'T'): 4,
 ('G', 'C'): 2,
 ('G', 'T'): 3,
 ('T', 'A'): 4,
 ('T', 'C'): 3,
 ('T', 'G'): 2,
 ('T', 'T'): 4}

Using pipes and the curried namespace


In [46]:
from toolz import curried as c

In [47]:
tz.pipe(
    dna,    
    c.sliding_window(2), # using curry
    c.frequencies,
)


Out[47]:
{('A', 'C'): 1,
 ('A', 'G'): 2,
 ('A', 'T'): 1,
 ('C', 'C'): 1,
 ('C', 'G'): 2,
 ('C', 'T'): 4,
 ('G', 'C'): 2,
 ('G', 'T'): 3,
 ('T', 'A'): 4,
 ('T', 'C'): 3,
 ('T', 'G'): 2,
 ('T', 'T'): 4}

In [48]:
composed = tz.compose(
    c.frequencies,
    c.sliding_window(2),  
)

In [49]:
composed(dna)


Out[49]:
{('A', 'C'): 1,
 ('A', 'G'): 2,
 ('A', 'T'): 1,
 ('C', 'C'): 1,
 ('C', 'G'): 2,
 ('C', 'T'): 4,
 ('G', 'C'): 2,
 ('G', 'T'): 3,
 ('T', 'A'): 4,
 ('T', 'C'): 3,
 ('T', 'G'): 2,
 ('T', 'T'): 4}

Processing many sets of DNA strings without reading into memory


In [50]:
m = 10000
n = 300
dnas = (''.join(np.random.choice(list('ACTG'), n, p=[.1, .2, .3, .4])) 
        for i in range(m))
dnas


Out[50]:
<generator object <genexpr> at 0x11d6bddb0>

In [51]:
tz.merge_with(sum, 
              tz.map(
                  composed,
                  dnas
              )
             )


Out[51]:
{('A', 'A'): 29994,
 ('A', 'C'): 59620,
 ('A', 'G'): 120072,
 ('A', 'T'): 89211,
 ('C', 'A'): 59860,
 ('C', 'C'): 119228,
 ('C', 'G'): 238914,
 ('C', 'T'): 179708,
 ('G', 'A'): 119757,
 ('G', 'C'): 239565,
 ('G', 'G'): 478575,
 ('G', 'T'): 359070,
 ('T', 'A'): 89249,
 ('T', 'C'): 179377,
 ('T', 'G'): 359409,
 ('T', 'T'): 268391}

Working with out-of-core memory

Using memmap

You can selectively retrieve parts of numpy arrays stored on disk into memory for processing with memmap.

Memory-mapped files are used for accessing small segments of large files on disk, without reading the entire file into memory. The numpy.memmap can be used anywhere an ndarray is used. The maximum size of a memmap array is limited by what the operating system allows - in particular, this is different on 32 and 64 bit architectures.

Creating the memory mapped file


In [52]:
n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)

# create memmap
fp = np.memmap(filename, dtype='float64', mode='w+', shape=shape)

# store some data in it
for i in range(n):
    x = np.random.random(shape[1:])
    fp[i] = x
    
# flush to disk and remove file handler
del fp

Usign the memory mapped file


In [53]:
fp = np.memmap(filename, dtype='float64', mode='r', shape=shape) 

# only one block is retreived into memory at a time
start = time.time()
xs = [fp[i].mean() for i in range(n)]
elapsed = time.time() - start

print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))


0.500001143637 Total: 0.64s Per file: 0.01s

Using HDF5

HDF5 is a hierarchical file format that allows selective disk reads, but also provides a tree structure for organizing your data sets. It can also include metadata annotation for documentation. Because of its flexibility, you should seriously consider using HDF5 for your data storage needs.

I suggest using the python package h5py for working with HDF5 files. See documentation.


In [54]:
import h5py

Creating an HDF5 file


In [55]:
%%time

n = 5
filename = 'random.hdf5'
shape = (n, 1000, 1000)

groups = ['Sim%02d' % i for i in range(5)]

with h5py.File(filename, 'w') as f:
    # Create hierarchical group structure
    for group in groups:
        g = f.create_group(group)
        # Add metadata for each group
        g.attrs['created'] = str(datetime.datetime.now())
        
        # Save 100 arrays in each group
        for i in range(n):
            x = np.random.random(shape[1:]) 
            dset = g.create_dataset('x%06d' % i, shape=x.shape)
            dset[:] = x
            # Add metadata for each array
            dset.attrs['created'] = str(datetime.datetime.now())


CPU times: user 701 ms, sys: 124 ms, total: 825 ms
Wall time: 920 ms

Using an HDF5 file


In [56]:
f = h5py.File('random.hdf5', 'r')

The HDF5 objects can be treated like dictionaries.


In [57]:
for name in f:
    print(name)


Sim00
Sim01
Sim02
Sim03
Sim04

In [58]:
for key in f.keys():
    print(key)


Sim00
Sim01
Sim02
Sim03
Sim04

In [59]:
sim1 = f.get('Sim01')

In [60]:
list(sim1.keys())[:5]


Out[60]:
['x000000', 'x000001', 'x000002', 'x000003', 'x000004']

Or recursed through like trees


In [61]:
f.visit(lambda x: print(x))


Sim00
Sim00/x000000
Sim00/x000001
Sim00/x000002
Sim00/x000003
Sim00/x000004
Sim01
Sim01/x000000
Sim01/x000001
Sim01/x000002
Sim01/x000003
Sim01/x000004
Sim02
Sim02/x000000
Sim02/x000001
Sim02/x000002
Sim02/x000003
Sim02/x000004
Sim03
Sim03/x000000
Sim03/x000001
Sim03/x000002
Sim03/x000003
Sim03/x000004
Sim04
Sim04/x000000
Sim04/x000001
Sim04/x000002
Sim04/x000003
Sim04/x000004

Retrieving data and attributes


In [62]:
sim2 = f.get('Sim02')
sim2.attrs['created']


Out[62]:
'2017-03-29 16:19:19.613896'

In [63]:
x = sim2.get('x000003')
print(x.shape)
print(x.dtype)
print(list(x.attrs.keys()))
print(x.attrs['created'])
np.mean(x)


(1000, 1000)
float32
['created']
2017-03-29 16:19:19.749793
Out[63]:
0.49980888

In [64]:
f.close()

Using SQLite3

When data is on a relational database, it is useful to do as much preprocessing as possible using SQL - this will be performed using highly efficient compiled routines on the (potentially remote) computer where the database exists.

Here we will use SQLite3 together with pandas to summarize a (potentially) large database.


In [65]:
import pandas as pd
from sqlalchemy import create_engine

In [66]:
engine = create_engine('sqlite:///data/movies.db')

In [67]:
q = '''
SELECT year, count(*) as number 
FROM data 
GROUP BY year 
ORDER BY number DESC
'''

# The coounting, grouping and sorting is done by the database, not pandas
# So this query will work even if the movies dataabse is many terabytes in size
df = pd.read_sql_query(q, engine)
df.head()


Out[67]:
year number
0 1996 1375
1 2000 1365
2 1998 1360
3 2002 1360
4 1997 1335

Out-of-memory data conversions

There is a convenient Python package called odo that will convert data between different formats without having to load all the data into memory first. This allows conversion of potentially huge files.

Odo is a shape shifting character in the Star Trek universe.


In [68]:
! pip install odo


Requirement already satisfied: odo in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages
Requirement already satisfied: datashape>=0.5.0 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: numpy>=1.7 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: pandas>=0.15.0 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: toolz>=0.7.3 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: multipledispatch>=0.4.7 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: networkx in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: python-dateutil in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from datashape>=0.5.0->odo)
Requirement already satisfied: pytz>=2011k in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from pandas>=0.15.0->odo)
Requirement already satisfied: decorator>=3.4.0 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from networkx->odo)
Requirement already satisfied: six>=1.5 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from python-dateutil->datashape>=0.5.0->odo)

In [69]:
import odo

In [70]:
odo.odo('sqlite:///data/movies.db::data', 'data/movies.csv')


Out[70]:
<odo.backends.csv.CSV at 0x1503c7828>

In [71]:
! head data/movies.csv


index,movieId,title,genres,year
0,1,"Toy Story (1995)",Adventure|Animation|Children|Comedy|Fantasy,1995
1,2,"Jumanji (1995)",Adventure|Children|Fantasy,1995
2,3,"Grumpier Old Men (1995)",Comedy|Romance,1995
3,4,"Waiting to Exhale (1995)",Comedy|Drama|Romance,1995
4,5,"Father of the Bride Part II (1995)",Comedy,1995
5,6,"Heat (1995)",Action|Crime|Thriller,1995
6,7,"Sabrina (1995)",Comedy|Romance,1995
7,8,"Tom and Huck (1995)",Adventure|Children,1995
8,9,"Sudden Death (1995)",Action,1995

Probabilistic data structures

A data sketch is a probabilistic algorithm or data structure that approximates some statistic of interest, typically using very little memory and processing time. Often they are applied to streaming data, and so must be able to incrementally process data. Many data sketches make use of hash functions to distribute data into buckets uniformly. Typically, data sketches have the following desirable properties

  • sub-linear in space
  • single scan
  • can be parallelized
  • can be combined (merge)

Examples where counting distinct values is useful:

  • number of unique users in a Twitter stream
  • number of distinct records to be fetched by a database query
  • number of unique IP addresses accessing a website
  • number of distinct queries submitted to a search engine
  • number of distinct DNA motifs in genomics data sets (e.g. microbiome)

Packages for data sketches in Python are relatively immature, and if you are interested, you could make a large contribution by creating a comprehensive open source library of data sketches in Python.

HyperLogLog

Counting the number of distinct elements exactly requires storage of all distinct elements (e.g. in a set) and hence grows with the cardinality $n$. Probabilistic data structures known as Distinct Value Sketches can do this with a tiny and fixed memory size.

A hash function takes data of arbitrary size and converts it into a number in a fixed range. Ideally, given an arbitrary set of data items, the hash function generates numbers that follow a uniform distribution within the fixed range. Hash functions are immensely useful throughout computer science (for example - they power Python sets and dictionaries), and especially for the generation of probabilistic data structures.

The binary digits in a (say) 32-bit hash are effectively random, and equivalent to a sequence of fair coin tosses. Hence the probability that we see a run of 5 zeros in the smallest hash so far suggests that we have added $2^5$ unique items so far. This is the intuition behind the loglog family of Distinct Value Sketches. Note that the biggest count we can track with 32 bits is $2^{32} = 4294967296$.

The accuracy of the sketch can be improved by averaging results with multiple coin flippers. In practice, this is done by using the first $k$ bit registers to identify $2^k$ different coin flippers. Hence, the max count is now $2 ** (32 - k)$. The hyperloglog algorithm uses the harmonic mean of the $2^k$ flippers which reduces the effect of outliers and hence the variance of the estimate.


In [72]:
! pip install hyperloglog


Requirement already satisfied: hyperloglog in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages

In [73]:
from hyperloglog import HyperLogLog

Compare unique counts with set and hyperloglog


In [74]:
def flatten(xs):
    return (x for sublist in xs for x in sublist)

def error(a, b, n):
    return abs(len(a) - len(b))/n

print('True\t\tHLL\t\tRel Error')
with open('data/Ulysses.txt') as f:
    word_list = flatten(line.split() for line in f)
    
    s = set([])
    hll = HyperLogLog(error_rate=0.01)
    for i, word in enumerate(word_list):
        s.add(word)
        hll.add(word)
        if i%int(.2e5)==0:
            print('%8d\t%8d\t\t%.3f' % 
                  (len(s), len(hll),
                   0 if i==0 else error(s, hll, i)))


True		HLL		Rel Error
       1	       1		0.000
    7095	    7151		0.003
   11582	   11667		0.002
   16010	   16202		0.003
   20058	   20296		0.003
   23761	   24087		0.003
   27463	   27785		0.003
   29780	   30173		0.003
   33740	   34206		0.003
   38070	   38473		0.002
   41599	   42235		0.003
   44119	   44619		0.002
   48549	   49197		0.003
   49511	   50191		0.003

Bloom filters

Bloom filters are designed to answer queries about whether a specific item is in a collection. If the answer is NO, then it is definitive. However, if the answer is yes, it might be a false positive. The possibility of a false positive makes the Bloom filter a probabilistic data structure.

A bloom filter consists of a bit vector of length $k$ initially set to zero, and $n$ different hash functions that return a hash value that will fall into one of the $k$ bins. In the construction phase, for every item in the collection, $n$ hash values are generated by the $n$ hash functions, and every position indicated by a hash value is flipped to one. In the query phase, given an item, $n$ hash values are calculated as before - if any of these $n$ positions is a zero, then the item is definitely not in the collection. However, because of the possibility of hash collisions, even if all the positions are one, this could be a false positive. Clearly, the rate of false positives depends on the ratio of zero and one bits, and there are Bloom filter implementations that will dynamically bound the ratio and hence the false positive rate.

Possible uses of a Bloom filter include:

  • Does a particular sequence motif appear in a DNA string?
  • Has this book been recommended to this customer before?
  • Check if an element exists on disk before performing I/O
  • Check if URL is a potential malware site using in-browser Bloom filter to minimize network communication
  • As an alternative way to generate distinct value counts cheaply (only increment count if Bloom filter says NO)

In [75]:
! pip install git+https://github.com/jaybaird/python-bloomfilter.git


Collecting git+https://github.com/jaybaird/python-bloomfilter.git
  Cloning https://github.com/jaybaird/python-bloomfilter.git to /private/var/folders/xf/rzdg30ps11g93j3w0h589q780000gn/T/pip-4g7j8fys-build
  Requirement already satisfied (use --upgrade to upgrade): pybloom==2.0.0 from git+https://github.com/jaybaird/python-bloomfilter.git in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages
Requirement already satisfied: bitarray>=0.3.4 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from pybloom==2.0.0)

In [76]:
from pybloom import ScalableBloomFilter

In [77]:
# The Scalable Bloom Filter grows as needed to keep the error rate small 
sbf = ScalableBloomFilter(error_rate=0.001)

In [78]:
with open('data/Ulysses.txt') as f:
    word_set = set(flatten(line.split() for line in f))
    for word in word_set:
        sbf.add(word)

Ask Bloom filter if test words were in Ulysses


In [79]:
test_words = ['banana', 'artist', 'Dublin', 'masochist', 'Obama']

In [80]:
for word in test_words:
    print(word, word in sbf)


banana True
artist True
Dublin True
masochist False
Obama False

In [81]:
for word in test_words:
    print(word, word in word_set)


banana True
artist True
Dublin True
masochist False
Obama False

Small-scale distributed programming

Using dask

For data sets that are not too big (say up to 1 TB), it is typically sufficient to process on a single workstation. The package dask provides 3 data structures that mimic regular Python data structures but perform computation in a distributed way allowing you to make optimal use of multiple cores easily.

These structures are

  • dask array ~ numpy array
  • dask bag ~ Python dictionary
  • dask dataframe ~ pandas dataframe

From the official documentation,

Dask is a simple task scheduling system that uses directed acyclic graphs (DAGs) of tasks to break up large computations into many small ones.

Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write    complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.

On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.

In [82]:
! pip install dask


Requirement already satisfied: dask in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages

In [83]:
import dask
import dask.array as da
import dask.bag as db
import dask.dataframe as dd

dask arrays

These behave like numpy arrays, but break a massive job into tasks that are then executed by a scheduler. The default scheduler uses threading but you can also use multiprocessing or distributed or even serial processing (mainly for debugging). You can tell the dask array how to break the data into chunks for processing.

From official documents

For performance, a good choice of chunks follows the following rules:

A chunk should be small enough to fit comfortably in memory. We’ll have many chunks in memory at once.
A chunk must be large enough so that computations on that chunk take significantly longer than the 1ms overhead per task that dask scheduling incurs. A task should take longer than 100ms.
Chunks should align with the computation that you want to do. For example if you plan to frequently slice along a particular dimension then it’s more efficient if your chunks are aligned so that you have to touch fewer chunks. If you want to add two arrays then its convenient if those arrays have matching chunks patterns.

In [84]:
# We resuse the 100 * 1000 * 1000 random numbers in the memmap file on disk
n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)
fp = np.memmap(filename, dtype='float64', mode='r', shape=shape)

# We can decide on the chunk size to be distributed for computing
xs = [da.from_array(fp[i], chunks=(200,500)) for i in range(n)]
xs = da.concatenate(xs)
avg = xs.mean().compute()

In [85]:
avg


Out[85]:
0.50000114363730053

In [86]:
# Typically we store Dask arrays inot HDF5

da.to_hdf5('data/xs.hdf5', '/foo/xs', xs)

In [87]:
with h5py.File('data/xs.hdf5', 'r') as f:
    print(f.get('/foo/xs').shape)


(100000, 1000)

dask data frames

Dask dataframes can treat multiple pandas dataframes that might not simultaneously fit into memory like a single dataframe. See use of globbing to specify multiple source files.


In [88]:
for i in range(5):
    f = 'data/x%03d.csv' % i
    np.savetxt(f, np.random.random((1000, 5)), delimiter=',')

In [89]:
df = dd.read_csv('data/x*.csv', header=None)
print(df.describe().compute())


                 0            1            2            3            4
count  5000.000000  5000.000000  5000.000000  5000.000000  5000.000000
mean      0.492787     0.498913     0.506285     0.497849     0.496456
std       0.290051     0.290067     0.289115     0.287252     0.288440
min       0.000120     0.000127     0.001602     0.000124     0.000028
25%       0.270114     0.261522     0.267837     0.262800     0.262552
50%       0.511363     0.525779     0.522839     0.515185     0.511809
75%       0.762856     0.755237     0.761059     0.761570     0.772811
max       0.999914     0.999940     0.999997     0.999718     0.999783

dask bags

Dask bags work like dictionaries for unstructured or semi-structured data sets, typically over many files.

The AA subdirectory consists of 101 1 MB plain text files from the English Wikipedia


In [90]:
text = db.read_text('data/wiki/AA/*')

In [91]:
%%time

words = text.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute()


CPU times: user 1min 12s, sys: 2.27 s, total: 1min 14s
Wall time: 1min 27s

In [92]:
print(top10)


[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)]

This is slow because of disk access. Fix by changing scheduler to work asynchronously.


In [93]:
%%time

words = text.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute(get = dask.async.get_sync)


CPU times: user 11.1 s, sys: 413 ms, total: 11.6 s
Wall time: 11.8 s

In [94]:
print(top10)


[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)]

Conversion from bag to dataframe


In [95]:
import string

In [96]:
freqs = (text.
         str.translate({ord(char): None for char in string.punctuation}).
         str.lower().
         str.split().
         concat().
         frequencies())
Get the top 5 words sorted by key (not value)

In [97]:
freqs.topk(5).compute(get = dask.async.get_sync)


Out[97]:
[('🚲', 1), ('𝛿', 2), ('𓏘𓃭𓇋𓍯𓊪𓄿𓂧𓂋𓄿𓏏𓆇', 1), ('𒀸𒋗𒁺', 1), ('𐑅', 1)]

In [98]:
df_freqs = freqs.to_dataframe(columns=['word', 'n'])
df_freqs.head(n=5)


Out[98]:
word n
0 €53 1
1 doodnath 1
2 iphone 82
3 flagged 3
4 desks 10

The compute method converts to a regular pandas dataframe

For data sets that fit in memory, pandas is faster and allows some operations like sorting that are not provided by dask dataframes.


In [99]:
df = df_freqs.compute()

In [100]:
df.sort_values('word', ascending=False).head(5)


Out[100]:
word n
16770 🚲 1
313069 𝛿 2
137307 𓏘𓃭𓇋𓍯𓊪𓄿𓂧𓂋𓄿𓏏𓆇 1
103308 𒀸𒋗𒁺 1
326342 𐑅 1