Chunking for large but not big data

PyBay 2016

Memory management

  • build an intuition about what chunking is doing

In [40]:
i = 0


with open('data.csv', 'r') as f:
        for line in f:
            if i < 3:
                print line
            i += 1


0.5451368119149018,-1.6996107290072229,7,9,0,0.8071396331247015,0.2724098588191702,1,1,1

0.07107213865436755,-0.36836779235781386,2,0,0,-1.1125927116408099,0.036738078578910344,0,0,1

-0.7418155746328328,1.467132160646872,3,6,1,0.0014421241396832412,0.2632936393069136,0,0,0


In [5]:
def read_chunks(file_path, chunk_size):
    with open(file_path, 'r') as f:
        result = []
        for line in f:
            if len(result) == chunk_size:
                yield result
                result = []
            result.append(convert_line(line))
        else:
                yield(result)
                
def convert_line(line):
    return [float(item) for item in line.split(',') if item != '\n']

In [6]:
i = 0
for chunk in read_chunks('data.csv', 10):
    if i < 2:
        print chunk
    i += 1


[[0.5451368119149018, -1.6996107290072229, 7.0, 9.0, 0.0, 0.8071396331247015, 0.2724098588191702, 1.0, 1.0, 1.0], [0.07107213865436755, -0.36836779235781386, 2.0, 0.0, 0.0, -1.1125927116408099, 0.036738078578910344, 0.0, 0.0, 1.0], [-0.7418155746328328, 1.467132160646872, 3.0, 6.0, 1.0, 0.0014421241396832412, 0.2632936393069136, 0.0, 0.0, 0.0], [-0.9465214125294857, 0.08530358707169183, 9.0, 3.0, 0.0, -0.3455951684668915, 0.44630107227479004, 0.0, 0.0, 0.0], [1.4152069939520378, 0.17971334523885543, 8.0, 5.0, 1.0, -0.03323317005008284, 0.10774269478053533, 0.0, 0.0, 1.0], [0.16099676181939104, 2.1077107542502733, 6.0, 1.0, 0.0, -0.9905723638316615, -0.7945683133938525, 0.0, 1.0, 0.0], [0.8937616981063747, 0.2758642561711137, 0.0, 0.0, 1.0, 3.183883988340478, -1.1713052134422954, 1.0, 0.0, 1.0], [-0.8706011275180053, 0.09621625919758102, 1.0, 8.0, 0.0, 0.9327173390472802, 0.1199176649737485, 1.0, 1.0, 1.0], [-0.49195346308732635, -1.0932728833707896, 2.0, 9.0, 1.0, -0.9262034137114997, -1.307305614274532, 1.0, 1.0, 0.0], [-0.31832419385038724, 1.3776122723734814, 5.0, 2.0, 1.0, -0.40458180528661436, -0.09991193817619384, 1.0, 0.0, 1.0]]
[[-1.9797422199906094, 0.20484950245674302, 3.0, 1.0, 1.0, -0.5088582377934716, 1.8231215002596732, 1.0, 1.0, 1.0], [-1.3241358458039347, 0.9453111447841026, 2.0, 4.0, 0.0, 0.7225352158919987, -2.1213326923203866, 0.0, 1.0, 0.0], [-0.012935206798529787, 0.565163924665128, 2.0, 4.0, 0.0, 0.4532647099706649, -0.8127234001332843, 0.0, 1.0, 0.0], [-0.6711755995363189, 0.09480681112060167, 3.0, 4.0, 1.0, 0.4277222186281182, 0.38415369251295606, 1.0, 0.0, 1.0], [-0.4538060508646856, 0.797285005299077, 6.0, 7.0, 1.0, -0.6406404789562519, -0.33297084341873273, 0.0, 0.0, 0.0], [0.5784110385861118, -0.48676816233852416, 7.0, 3.0, 0.0, -0.8136929368038452, 0.2681101498233064, 1.0, 0.0, 1.0], [1.5360125284931099, -0.6850575105882283, 9.0, 4.0, 0.0, 2.504005311400736, -1.4554951411892434, 0.0, 0.0, 1.0], [0.19211968704883547, -0.036706169211782574, 3.0, 1.0, 1.0, -1.0287797181531717, 1.1113895783255507, 1.0, 1.0, 1.0], [1.0182868091248582, -0.5775767346641175, 2.0, 8.0, 0.0, 0.2872478539375859, -1.1860106415998948, 1.0, 0.0, 1.0], [0.08750511163578463, 1.400576407188057, 2.0, 4.0, 0.0, 0.34673556415524887, -0.8657461909953054, 0.0, 0.0, 0.0]]

Chunking in Pandas


In [4]:
import pandas as pd

In [5]:
generator = pd.read_csv('data.csv', header=None, chunksize=10) #returns a generator
i = 0
for chunk in generator:
    if i < 2:
        print chunk
    i += 1


          0         1  2  3  4         5         6  7  8  9
0  0.545137 -1.699611  7  9  0  0.807140  0.272410  1  1  1
1  0.071072 -0.368368  2  0  0 -1.112593  0.036738  0  0  1
2 -0.741816  1.467132  3  6  1  0.001442  0.263294  0  0  0
3 -0.946521  0.085304  9  3  0 -0.345595  0.446301  0  0  0
4  1.415207  0.179713  8  5  1 -0.033233  0.107743  0  0  1
5  0.160997  2.107711  6  1  0 -0.990572 -0.794568  0  1  0
6  0.893762  0.275864  0  0  1  3.183884 -1.171305  1  0  1
7 -0.870601  0.096216  1  8  0  0.932717  0.119918  1  1  1
8 -0.491953 -1.093273  2  9  1 -0.926203 -1.307306  1  1  0
9 -0.318324  1.377612  5  2  1 -0.404582 -0.099912  1  0  1
          0         1  2  3  4         5         6  7  8  9
0 -1.979742  0.204850  3  1  1 -0.508858  1.823122  1  1  1
1 -1.324136  0.945311  2  4  0  0.722535 -2.121333  0  1  0
2 -0.012935  0.565164  2  4  0  0.453265 -0.812723  0  1  0
3 -0.671176  0.094807  3  4  1  0.427722  0.384154  1  0  1
4 -0.453806  0.797285  6  7  1 -0.640640 -0.332971  0  0  0
5  0.578411 -0.486768  7  3  0 -0.813693  0.268110  1  0  1
6  1.536013 -0.685058  9  4  0  2.504005 -1.455495  0  0  1
7  0.192120 -0.036706  3  1  1 -1.028780  1.111390  1  1  1
8  1.018287 -0.577577  2  8  0  0.287248 -1.186011  1  0  1
9  0.087505  1.400576  2  4  0  0.346736 -0.865746  0  0  0

Chunked data in hdf5


In [3]:
import h5py

In [6]:
f = h5py.File('demo.hdf5', 'r+')
list(f.keys())


Out[6]:
[u'data']

In [7]:
f['data'][0:3]  # slice through object on disk instead of memory


Out[7]:
array([[  5.45136812e-01,  -1.69961073e+00,   7.00000000e+00,
          9.00000000e+00,   0.00000000e+00,   8.07139633e-01,
          2.72409859e-01,   1.00000000e+00,   1.00000000e+00,
          1.00000000e+00],
       [  7.10721387e-02,  -3.68367792e-01,   2.00000000e+00,
          0.00000000e+00,   0.00000000e+00,  -1.11259271e+00,
          3.67380786e-02,   0.00000000e+00,   0.00000000e+00,
          1.00000000e+00],
       [ -7.41815575e-01,   1.46713216e+00,   3.00000000e+00,
          6.00000000e+00,   1.00000000e+00,   1.44212414e-03,
          2.63293639e-01,   0.00000000e+00,   0.00000000e+00,
          0.00000000e+00]])

In [9]:
f['data'].chunks # entire size of dataframe on disk


Out[9]:
(100, 10)

In [11]:
f.create_dataset('test', data=range(10000), chunks=(10,))


Out[11]:
<HDF5 dataset "test": shape (10000,), type "<i8">

In [12]:
f['test'].chunks


Out[12]:
(10,)

In [13]:
f.close()

Intelligent reading with databases


In [16]:
import sqlite3 as sql

In [18]:
connection = sql.connect('demo.db')

c = connection.cursor()

In [19]:
c.execute('select * from data')

i = 0
for row in c.fetchmany(size=10):
    if i < 3: 
        print row
    i += 1


(0, 0.5451368119149018, -1.6996107290072229, 7, 9, 0, 0.8071396331247015, 0.2724098588191702, 1, 1, 1)
(1, 0.07107213865436755, -0.36836779235781386, 2, 0, 0, -1.1125927116408099, 0.036738078578910344, 0, 0, 1)
(2, -0.7418155746328328, 1.467132160646872, 3, 6, 1, 0.0014421241396832412, 0.2632936393069136, 0, 0, 0)

Intra-routine parallelism


In [20]:
import numpy as np

In [21]:
data = np.array(pd.read_csv('data.csv', header=None))

In [23]:
data.shape


Out[23]:
(100, 10)

Play with matrix algebra to show how fast numpy is running


In [24]:
def dumb_add(x, y):
    for row_index in range(len(x)):
        for col_index in range(len(x[0])):
            x[row_index][col_index] += y[row_index][col_index]
    return x

In [25]:
%timeit dumb_add(data, data)


1000 loops, best of 3: 1.05 ms per loop

In [27]:
%timeit data + data  ## now do the same thing with numpy


The slowest run took 32.86 times longer than the fastest. This could mean that an intermediate result is being cached.
1000000 loops, best of 3: 1.21 µs per loop

How to parallelize within a process


In [30]:
from dask import dataframe

In [31]:
data = dataframe.read_csv('data.csv', header=None)
col_max = data.max()
print col_max


dd.Series<datafra..., npartitions=1>

In [32]:
col_max.dask


Out[32]:
{'bytes_read_csv-69dd0520c52961dd00da4ccbced7584c': (<function apply>,
  <function dask.dataframe.csv.bytes_read_csv>,
  ['read-file-block-d5e2fdeb9e825be98d4b7a8ce9bd9622-0',
   '',
   (dict, [['header', None]]),
   (dict,
    [[0, dtype('float64')],
     [1, dtype('float64')],
     [2, dtype('int64')],
     [3, dtype('int64')],
     [4, dtype('int64')],
     [5, dtype('float64')],
     [6, dtype('float64')],
     [7, dtype('int64')],
     [8, dtype('int64')],
     [9, dtype('int64')]]),
   (list, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])],
  (dict, [['write_header', False], ['enforce', False]])),
 'read-file-block-d5e2fdeb9e825be98d4b7a8ce9bd9622-0': (<function dask.bytes.local.read_block_from_file>,
  'data.csv',
  0,
  64000000,
  '\n',
  None),
 ('dataframe-max--first-04edc869a5e065dcef1a4f2875158042',
  0): (<function apply>, <function dask.dataframe.core._max>, [('from-delayed-01300be2f1f447ed383b5523c6b07192',
    0)], {'axis': 0, 'skipna': True}),
 ('dataframe-max--second-04edc869a5e065dcef1a4f2875158042',
  0): (<function apply>, <function dask.dataframe.core.aggregate>, [(<function dask.dataframe.core._concat>,
    (list,
     [('dataframe-max--first-04edc869a5e065dcef1a4f2875158042',
       0)]))], {'axis': 0,
   'skipna': True}),
 ('from-delayed-01300be2f1f447ed383b5523c6b07192',
  0): 'bytes_read_csv-69dd0520c52961dd00da4ccbced7584c'}

In [33]:
col_max.compute()


Out[33]:
0    2.936829
1    2.107711
2    9.000000
3    9.000000
4    1.000000
5    3.183884
6    2.280348
7    1.000000
8    1.000000
9    1.000000
dtype: float64

Parallelizing at the job level


In [37]:
from app import summarize_file

In [38]:
summarize_file('data.csv')

In [ ]:
#delayed task processing
summarize_file.delay('data.csv', )

In [ ]:


In [ ]:
## start up rabbitmq
## then use celery to start the app we just created using 'celery -A ...'