Processing Multiple Pandas Series in Parallel

Introduction

Python's Pandas library for data processing is great for all sorts of data-processing tasks. However, one thing it doesn't support out of the box is parallel processing across multiple cores.

I've been wanting a simple way to process Pandas DataFrames in parallel, and recently I found this truly awesome blog post.. It shows how to apply an arbitrary Python function to each object in a sequence, in parallel, using Pool.map from the Multiprocessing library.

The author's example involves running urllib2.urlopen() across a list of urls, to scrape html from several web sites in parallel. But the principle applies equally to mapping a function across several columns in a Pandas DataFrame. Here's an example of how useful that can be.

A simple multiprocessing wrapper

Here's some code which will accept a Pandas DataFrame and a function, apply the function to each column in the DataFrame, and return the results (as a new dataframe). It also allows the caller to specify the number of processes to run in parallel, but uses a sensible default when not provided.


In [1]:
from multiprocessing import Pool, cpu_count

def process_Pandas_data(func, df, num_processes=None):
    ''' Apply a function separately to each column in a dataframe, in parallel.'''
    
    # If num_processes is not specified, default to minimum(#columns, #machine-cores)
    if num_processes==None:
        num_processes = min(df.shape[1], cpu_count())
    
    # 'with' context manager takes care of pool.close() and pool.join() for us
    with Pool(num_processes) as pool:
        
        # we need a sequence to pass pool.map; this line creates a generator (lazy iterator) of columns
        seq = [df[col_name] for col_name in df.columns]
        
        # pool.map returns results as a list
        results_list = pool.map(func, seq)
        
        # return list of processed columns, concatenated together as a new dataframe
        return pd.concat(results_list, axis=1)

Hopefully the code above looks pretty straightforward, but if it looks a bit confusing at first glance, ultimately the key is these two lines:


In [2]:
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# UNCOMMENT IN MARKDOWN BEFORE PUSHING LIVE
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

# (commented out so can run notebook in one click.)
#with Pool(num_processes) as pool:
#    ...
#    results_list = pool.map(func, seq)

the rest was just setting the default number of processes to run in parallel, getting a 'sequence of columns' from our input dataframe, and concatenating the list of results we get back from pool.map

A function to measure parallel performance gains with

To measure the speed boost from wrapping a bit of Pandas processing in this multiprocessing wrapper, I'm going to load the Quora Duplicate Questions dataset, and the vectorized text-tokenizing function from my last blog post on using vectorized Pandas functions.


In [3]:
import pandas as pd

df = pd.read_csv('datasets/quora_kaggle.csv')
df.head(3)


Out[3]:
id qid1 qid2 question1 question2 is_duplicate
0 0 1 2 What is the step by step guide to invest in sh... What is the step by step guide to invest in sh... 0
1 1 3 4 What is the story of Kohinoor (Koh-i-Noor) Dia... What would happen if the Indian government sto... 0
2 2 5 6 How can I increase the speed of my internet co... How can Internet speed be increased by hacking... 0

In [4]:
import re
from nltk.corpus import stopwords

def tokenize_column(text_series):
    ''' Accept a series of strings, returns list of words (lowercased) without punctuation or stopwords'''

    # lowercase everything
    text_series = text_series.astype(str).str.lower()
    
    # remove punctuation (r'\W' is regex, matches any non-alphanumeric character)
    text_series = text_series.str.replace(r'\W', ' ')
    
    # return list of words, without stopwords
    sw = stopwords.words('english')
    
    return text_series.apply(lambda row: [word for word in row.split() if word not in sw])

To see what this does "tokenizing" function does, here's a few unprocessed quora questions, followed by their outputs from the tokenizer


In [5]:
print(df.question1.head(3), '\n\n', tokenize_column(df.question1.head(3)))


0    What is the step by step guide to invest in sh...
1    What is the story of Kohinoor (Koh-i-Noor) Dia...
2    How can I increase the speed of my internet co...
Name: question1, dtype: object 

 0    [step, step, guide, invest, share, market, india]
1                [story, kohinoor, koh, noor, diamond]
2    [increase, speed, internet, connection, using,...
Name: question1, dtype: object

Clocking Performance Gains of Using Multiprocessing, 2 Cores

The two functions below clock the time elapsed from tokenizing our two question columns in series or in parallel.

Defining these tests as their own functions means we're not creating any new global-scope variables when we measure performance. All the intermediate results (like the new dataframes of processed questions) are garbage-collected after the function returns its results (an elapsed time). This is important to maintain an apples-to-apples performance comparison; otherwise, performance tests run later in the notebook would have less RAM available than the first test we run.


In [6]:
from datetime import datetime

def clock_tokenize_in_series(df):    
    '''Calc time to process in series'''
    
    # Initialize dataframe to hold processed questions, and start clock
    qs_processed = pd.DataFrame()
    start = datetime.now()

    # process question columns in series
    for col in df.columns:
        qs_processed[col] = tokenize_column(df[col])

    # return time elapsed
    return datetime.now() - start
    

def clock_tokenize_in_parallel(df):    
    '''Calc time to process in parallel'''
    
    # Initialize dataframe to hold processed questions, and start clock
    qs_processed = pd.DataFrame()
    start = datetime.now()

    # process question columns in parallel
    qs_processed2 = process_Pandas_data(tokenize_column, df)

    # return time elapsed
    return datetime.now() - start

And now to measure our results:


In [7]:
# Print Time Results
no_parallel = clock_tokenize_in_series(df[['question1', 'question2']])
parallel    = clock_tokenize_in_parallel(df[['question1', 'question2']])

print('Time elapsed for processing 2 questions in series :', no_parallel)
print('Time elapsed for processing 2 questions in parallel :', parallel)


Time elapsed for processing 2 questions in series : 0:00:23.383500
Time elapsed for processing 2 questions in parallel : 0:00:15.383277

So processing the two columns in parallel cut our processing time from 23.7 seconds down to 14.7 seconds, a decrease of 38%. The theoretical maximum reduction we might have expected with no multiprocessing overhead would of course been a 50% reduction, so this is not bad.

Comparing Performance with 4 Cores

I have four cores on this laptop, and I'd like to see how the performance gains scale here from two to four cores. Below, I'll make copies of our q1 and q2 so we have four total text columns, then re-run the comparison by passing this new 4-column dataframe to the testing function defined above.


In [8]:
# Column-bind two questions with copies of themselves for 4 text columns
four_qs = pd.concat([df[['question1','question2']], 
                     df[['question1','question2']]], axis=1) 

four_qs.columns = ['q1', 'q2', 'q1copy', 'q2copy']
four_qs.head(2)


Out[8]:
q1 q2 q1copy q2copy
0 What is the step by step guide to invest in sh... What is the step by step guide to invest in sh... What is the step by step guide to invest in sh... What is the step by step guide to invest in sh...
1 What is the story of Kohinoor (Koh-i-Noor) Dia... What would happen if the Indian government sto... What is the story of Kohinoor (Koh-i-Noor) Dia... What would happen if the Indian government sto...

In [9]:
# Print Results for running tokenizer on 4 questions in series, then in parallel
no_parallel = clock_tokenize_in_series(four_qs)
parallel    = clock_tokenize_in_parallel(four_qs)

print('Time elapsed for processing 4 questions in series :', no_parallel)
print('Time elapsed for processing 4 questions in parallel :', parallel)


Time elapsed for processing 4 questions in series : 0:00:46.647125
Time elapsed for processing 4 questions in parallel : 0:00:25.574532

Conclusion

[edit this after nbconvert to markdown, based on final stats]

multiprocessing does have to pickle (serialize) each object in seq to send it to a Pool worker, deserialize it to work on it, then serialize/deserialize. So I'm guessing this doesn't scale very well for multiprocessing because the text data is relatively large, and the processing itself happens relatively quickly. so it's a lot of serializing/unserializing cost to pay for a relatively little. maybe if our underlying calculation were more compute-intensive, it would scale better. I'll try and