Python's Pandas library for data processing is great for all sorts of data-processing tasks. However, one thing it doesn't support out of the box is parallel processing across multiple cores.
I've been wanting a simple way to process Pandas DataFrames in parallel, and recently I found this truly awesome blog post.. It shows how to apply an arbitrary Python function to each object in a sequence, in parallel, using Pool.map from the Multiprocessing library.
The author's example involves running urllib2.urlopen() across a list of urls, to scrape html from several web sites in parallel. But the principle applies equally to mapping a function across several columns in a Pandas DataFrame. Here's an example of how useful that can be.
Here's some code which will accept a Pandas DataFrame and a function, apply the function to each column in the DataFrame, and return the results (as a new dataframe). It also allows the caller to specify the number of processes to run in parallel, but uses a sensible default when not provided.
In [1]:
from multiprocessing import Pool, cpu_count
def process_Pandas_data(func, df, num_processes=None):
''' Apply a function separately to each column in a dataframe, in parallel.'''
# If num_processes is not specified, default to minimum(#columns, #machine-cores)
if num_processes==None:
num_processes = min(df.shape[1], cpu_count())
# 'with' context manager takes care of pool.close() and pool.join() for us
with Pool(num_processes) as pool:
# we need a sequence to pass pool.map; this line creates a generator (lazy iterator) of columns
seq = [df[col_name] for col_name in df.columns]
# pool.map returns results as a list
results_list = pool.map(func, seq)
# return list of processed columns, concatenated together as a new dataframe
return pd.concat(results_list, axis=1)
Hopefully the code above looks pretty straightforward, but if it looks a bit confusing at first glance, ultimately the key is these two lines:
In [2]:
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# UNCOMMENT IN MARKDOWN BEFORE PUSHING LIVE
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# (commented out so can run notebook in one click.)
#with Pool(num_processes) as pool:
# ...
# results_list = pool.map(func, seq)
the rest was just setting the default number of processes to run in parallel, getting a 'sequence of columns' from our input dataframe, and concatenating the list of results we get back from pool.map
To measure the speed boost from wrapping a bit of Pandas processing in this multiprocessing wrapper, I'm going to load the Quora Duplicate Questions dataset, and the vectorized text-tokenizing function from my last blog post on using vectorized Pandas functions.
In [3]:
import pandas as pd
df = pd.read_csv('datasets/quora_kaggle.csv')
df.head(3)
Out[3]:
In [4]:
import re
from nltk.corpus import stopwords
def tokenize_column(text_series):
''' Accept a series of strings, returns list of words (lowercased) without punctuation or stopwords'''
# lowercase everything
text_series = text_series.astype(str).str.lower()
# remove punctuation (r'\W' is regex, matches any non-alphanumeric character)
text_series = text_series.str.replace(r'\W', ' ')
# return list of words, without stopwords
sw = stopwords.words('english')
return text_series.apply(lambda row: [word for word in row.split() if word not in sw])
To see what this does "tokenizing" function does, here's a few unprocessed quora questions, followed by their outputs from the tokenizer
In [5]:
print(df.question1.head(3), '\n\n', tokenize_column(df.question1.head(3)))
The two functions below clock the time elapsed from tokenizing our two question columns in series or in parallel.
Defining these tests as their own functions means we're not creating any new global-scope variables when we measure performance. All the intermediate results (like the new dataframes of processed questions) are garbage-collected after the function returns its results (an elapsed time). This is important to maintain an apples-to-apples performance comparison; otherwise, performance tests run later in the notebook would have less RAM available than the first test we run.
In [6]:
from datetime import datetime
def clock_tokenize_in_series(df):
'''Calc time to process in series'''
# Initialize dataframe to hold processed questions, and start clock
qs_processed = pd.DataFrame()
start = datetime.now()
# process question columns in series
for col in df.columns:
qs_processed[col] = tokenize_column(df[col])
# return time elapsed
return datetime.now() - start
def clock_tokenize_in_parallel(df):
'''Calc time to process in parallel'''
# Initialize dataframe to hold processed questions, and start clock
qs_processed = pd.DataFrame()
start = datetime.now()
# process question columns in parallel
qs_processed2 = process_Pandas_data(tokenize_column, df)
# return time elapsed
return datetime.now() - start
And now to measure our results:
In [7]:
# Print Time Results
no_parallel = clock_tokenize_in_series(df[['question1', 'question2']])
parallel = clock_tokenize_in_parallel(df[['question1', 'question2']])
print('Time elapsed for processing 2 questions in series :', no_parallel)
print('Time elapsed for processing 2 questions in parallel :', parallel)
So processing the two columns in parallel cut our processing time from 23.7 seconds down to 14.7 seconds, a decrease of 38%. The theoretical maximum reduction we might have expected with no multiprocessing overhead would of course been a 50% reduction, so this is not bad.
I have four cores on this laptop, and I'd like to see how the performance gains scale here from two to four cores. Below, I'll make copies of our q1 and q2 so we have four total text columns, then re-run the comparison by passing this new 4-column dataframe to the testing function defined above.
In [8]:
# Column-bind two questions with copies of themselves for 4 text columns
four_qs = pd.concat([df[['question1','question2']],
df[['question1','question2']]], axis=1)
four_qs.columns = ['q1', 'q2', 'q1copy', 'q2copy']
four_qs.head(2)
Out[8]:
In [9]:
# Print Results for running tokenizer on 4 questions in series, then in parallel
no_parallel = clock_tokenize_in_series(four_qs)
parallel = clock_tokenize_in_parallel(four_qs)
print('Time elapsed for processing 4 questions in series :', no_parallel)
print('Time elapsed for processing 4 questions in parallel :', parallel)
[edit this after nbconvert to markdown, based on final stats]
multiprocessing does have to pickle (serialize) each object in seq to send it to a Pool worker, deserialize it to work on it, then serialize/deserialize. So I'm guessing this doesn't scale very well for multiprocessing because the text data is relatively large, and the processing itself happens relatively quickly. so it's a lot of serializing/unserializing cost to pay for a relatively little. maybe if our underlying calculation were more compute-intensive, it would scale better. I'll try and