I have been using dask for speeding up some larger scale analyses. Dask is a really great tool for inplace replacement for parrallelizing some pyData-powered analyses, such as numpy, pandas and even scikit-learn.
However, I recently found an interesting case where using same syntax in dask.dataframe for pandas.dataframe does not acheive what I want. So in this post, I will document how to overcome it for future self.
As usual, lets import all the useful libraries:
In [3]:
import pandas as pd
import dask.dataframe as dd
I will use the famous titanic dataset as an example to show that how dask can act weirdly under groupby + apply operations.
In [10]:
titanic = pd.read_csv('http://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv')
titanic.head()
Out[10]:
I will illustrate the problem by counting how many survivors in each age and sex group, using the following function:
In [8]:
def count_survival(d):
'''
summarize survivor, and return an dataframe for the single value-ed array
'''
return pd.DataFrame({'survived':[d.Survived.sum()]})
An regular pandas way to do it would be:
In [14]:
titanic \
.groupby(['Age','Sex'])\
.apply(count_survival)\
.head()
Out[14]:
Lets translate the pandas.dataframe to a dask.dataframe and do the same
In [16]:
dask_job = titanic \
.pipe(dd.from_pandas, npartitions=24)\
.groupby(['Age','Sex']) \
.apply(count_survival, meta={'survived':'f8'})
This is not going to return any result until we do dask_job.compute(), but dask also include a visualize function to show the task graph:
In [18]:
dask_job.visualize()
Out[18]:
The resultant task graph is much more complicated than I would've expected, and this is actually because data shuffling behind the scene. Suggested by the dask documentation, this issue can be resolved by setting a groupby key as index:
In [21]:
dask_job = titanic \
.set_index('Age')\
.pipe(dd.from_pandas, npartitions=24)\
.groupby(['Age','Sex']) \
.apply(count_survival, meta={'survived':'f8'})
dask_job.visualize()
Out[21]:
In [ ]: