Large CSV files, or large collections of many CSV files are a common-yet-cumbersome data format. If the data in these files doesn't fit into memory we're usually forced to
This notebook demonstrates that Blaze operates smoothly on such data. It then shows exactly how Blaze uses Pandas by effectively automating the gynmastics in step 2. We perform an out-of-core split-apply-combine operation on the NYC Taxicab dataset while using a comfortably small amount of space.
All computations in this notebook were done on a personal laptop with smallish memory using a recent version of blaze
conda install -c blaze blaze
or
pip install blaze
In [1]:
# !wget https://nyctaxitrips.blob.core.windows.net/data/trip_data_{1,2,3,4,5,6,7,8,9,10,11,12}.csv.zip
We design the blaze.Data
constructor to be easy to use. Here we give it a globstring of the files we want to analyze.
It gives us a quick head
of the data immediately, even though there are several gigabytes of data. If you're unfamiliar with the data you may want to quickly peruse the columns and values.
In [2]:
from blaze import *
d = Data('trip_data_*.csv')
d
Out[2]:
In [3]:
by(d.passenger_count, avg_distance=d.trip_distance.mean(),
count=d.passenger_count.count())
Out[3]:
It's useful to note here all the things we didn't do.
pandas.read_csv
with the right arguments to make this fastBlaze handles these things for us. It drives Pandas intelligently, breaks up our computation into pieces it can perform in memory, and then shoves data through Pandas as fast as it can.
In [4]:
expr = by(d.passenger_count, avg_distance=d.trip_distance.mean(),
count=d.passenger_count.count())
%time _ = compute(expr)
Not great, but not terrible. This is about as fast as Postgres does it after the data has been loaded into Postgres.
Blaze can use many cores to accelerate this work. It still uses pandas in each core but it now just splits apart the computation intelligently and directs different CSV files to different cores.
This gives a significant speedup even on my laptop. On a large workstation with more cores this speedup is more pronounced.
In [5]:
import multiprocessing
pool = multiprocessing.Pool(4)
In [6]:
%time _ = compute(expr, map=pool.map)
In [7]:
totals = []
counts = []
for chunk in pd.read_csv('trip_data_1.csv', chunksize=1000000, usecols=['passenger_count']):
totals.append(chunk.passenger_count.sum())
counts.append(chunk.passenger_count.count())
Then we perform a second computation on these intermediate results
In [8]:
1.0 * sum(totals) / sum(counts)
Out[8]:
This is exactly what Blaze does when we type the following.
In [9]:
Data('trip_data_1.csv').passenger_count.mean()
Out[9]:
So to perform one computation, mean
, on an out of core dataset, we end up performing two different sets of computations
sum
and count
on each in-memory chunksum
and division
on the aggregated results from step #1This breakdown works on a surprisingly large class of operations. Split-apply-combine operations are handled similarly. We perform a different split-apply-combine operation on each chunk and then another on the aggregated results.
For more information on this see Blaze's out-of-core docs.
OK, so lets go through and solve the entire out-of-core split-apply-combine problem on all of the CSV files.
Feel free to ignore this example. It's mostly here to show explicitly exactly what Blaze does for those who care and to generally impress those who don't.
Hold on to your butts.
In [10]:
%%time
from glob import glob
# Specifying active columns at parse time greatly improves performance
active_columns = ['passenger_count', 'trip_distance']
intermediates = []
# Do a split-apply-combine operation on each chunk of each CSV file
for fn in sorted(glob('trip_data_*.csv')):
for df in pd.read_csv(fn, usecols=active_columns,
chunksize=1000000, skipinitialspace=True):
chunk = df.groupby('passenger_count').agg({'passenger_count': ['count'],
'trip_distance': ['sum', 'count']})
intermediates.append(chunk)
# Bring those results together. These are much smaller and so likely fit in memory
df = pd.concat(intermediates, axis=0)
df.columns = ['trip_distance_sum', 'trip_distance_count', 'passenger_count_count'] # Flatten multi-index
# Perform second split-apply-combine operation on those intermediate results
groups = df.groupby(df.index) # group once for many of the following applies
df2 = pd.concat([groups.trip_distance_sum.sum(),
groups.trip_distance_count.sum(),
groups.passenger_count_count.sum()],
axis=1)
df2['avg_distance'] = df2.trip_distance_sum / df2.trip_distance_count
df2['count'] = df2.passenger_count_count
# Select out the columns we want
result = df2[['avg_distance', 'count']]
result
Blaze is a general library to bring expert data analysis into the hands of everyday users
The example above is a PITA to do by hand. More than that it has a number of tricks not known to many Pandas users.
Fortunately Blaze automates these tricks, making them routine for a broad class of problems. Moreso it does this from a relatively naive user-focused syntax.
d = Data('trip_data_*.csv')
by(d.passenger_count, avg_distance=d.trip_distance.mean(),
count=d.passenger_count.count())
Hopefully this example helps to explain how Blaze chunks apart computations on large CSV files to operate in memory.
This also highlights the relationship between Blaze and Pandas. Pandas is Blaze's preferred library when it performs in-memory analytics on tabular data. In these cases it's Blaze's job to arrange data well and call Pandas with the right arguments while it's Pandas' job to actually do the computation.
As a reminder, large CSV files are just one application of Blaze. Blaze provides a similar experience and set-of-tricks for SQL, Spark, and Binary storage files.
You can learn more about Blaze at http://blaze.pydata.org/