Try Dask - Parallel Processing Machine Learning

  • Dask is used for parallel processing, it's similar to Spark but copies part of sklearn, numpy, pandas and Spark, rather than having its own libraries like Spark.
  • Conda has Dask itself.
  • You also need to install:
    • dask-ml: by typing conda install -c conda-forge dask-ml in your terminal
    • dask-searchcv: by typping conda install dask-searchcv -c conda-forge in your terminal

In [47]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder

In [49]:
train = pd.read_csv("Big_Mart_Train.csv")
test = pd.read_csv("Big_Mart_Test.csv")

In [50]:
train.head()


Out[50]:
Item_Identifier Item_Weight Item_Fat_Content Item_Visibility Item_Type Item_MRP Outlet_Identifier Outlet_Establishment_Year Outlet_Size Outlet_Location_Type Outlet_Type Item_Outlet_Sales
0 FDA15 9.30 Low Fat 0.016047 Dairy 249.8092 OUT049 1999 Medium Tier 1 Supermarket Type1 3735.1380
1 DRC01 5.92 Regular 0.019278 Soft Drinks 48.2692 OUT018 2009 Medium Tier 3 Supermarket Type2 443.4228
2 FDN15 17.50 Low Fat 0.016760 Meat 141.6180 OUT049 1999 Medium Tier 1 Supermarket Type1 2097.2700
3 FDX07 19.20 Regular 0.000000 Fruits and Vegetables 182.0950 OUT010 1998 NaN Tier 3 Grocery Store 732.3800
4 NCD19 8.93 Low Fat 0.000000 Household 53.8614 OUT013 1987 High Tier 3 Supermarket Type1 994.7052

In [51]:
train.isnull().sum()


Out[51]:
Item_Identifier                 0
Item_Weight                  1463
Item_Fat_Content                0
Item_Visibility                 0
Item_Type                       0
Item_MRP                        0
Outlet_Identifier               0
Outlet_Establishment_Year       0
Outlet_Size                  2410
Outlet_Location_Type            0
Outlet_Type                     0
Item_Outlet_Sales               0
dtype: int64

In [52]:
test.isnull().sum()


Out[52]:
Item_Identifier                 0
Item_Weight                   976
Item_Fat_Content                0
Item_Visibility                 0
Item_Type                       0
Item_MRP                        0
Outlet_Identifier               0
Outlet_Establishment_Year       0
Outlet_Size                  1606
Outlet_Location_Type            0
Outlet_Type                     0
dtype: int64

In [53]:
# fill NA with median
train.Item_Weight = train.Item_Weight.fillna(np.nanmedian(train.Item_Weight))
test.Item_Weight = test.Item_Weight.fillna(np.nanmedian(test.Item_Weight))

In [54]:
print train.Outlet_Size.unique()
print test.Outlet_Size.unique()


['Medium' nan 'High' 'Small']
['Medium' nan 'Small' 'High']

In [22]:
# fill NA with mode
train.Outlet_Size = train.Outlet_Size.fillna(train.Outlet_Size.mode().iloc[0])
test.Outlet_Size = test.Outlet_Size.fillna(train.Outlet_Size.mode().iloc[0])

In [24]:
train.dtypes


Out[24]:
Item_Identifier               object
Item_Weight                  float64
Item_Fat_Content              object
Item_Visibility              float64
Item_Type                     object
Item_MRP                     float64
Outlet_Identifier             object
Outlet_Establishment_Year      int64
Outlet_Size                   object
Outlet_Location_Type          object
Outlet_Type                   object
Item_Outlet_Sales            float64
dtype: object

In [55]:
print train.Item_Fat_Content.unique()
print test.Item_Fat_Content.unique()
print train.Item_Type.unique()
print test.Item_Type.unique()
print train.Outlet_Identifier.unique()
print test.Outlet_Identifier.unique()
print train.Outlet_Size.unique()
print test.Outlet_Size.unique()
print train.Outlet_Location_Type.unique()
print test.Outlet_Location_Type.unique()
print train.Outlet_Type.unique()
print test.Outlet_Type.unique()


['Low Fat' 'Regular' 'low fat' 'LF' 'reg']
['Low Fat' 'reg' 'Regular' 'LF' 'low fat']
['Dairy' 'Soft Drinks' 'Meat' 'Fruits and Vegetables' 'Household'
 'Baking Goods' 'Snack Foods' 'Frozen Foods' 'Breakfast'
 'Health and Hygiene' 'Hard Drinks' 'Canned' 'Breads' 'Starchy Foods'
 'Others' 'Seafood']
['Snack Foods' 'Dairy' 'Others' 'Fruits and Vegetables' 'Baking Goods'
 'Health and Hygiene' 'Breads' 'Hard Drinks' 'Seafood' 'Soft Drinks'
 'Household' 'Frozen Foods' 'Meat' 'Canned' 'Starchy Foods' 'Breakfast']
['OUT049' 'OUT018' 'OUT010' 'OUT013' 'OUT027' 'OUT045' 'OUT017' 'OUT046'
 'OUT035' 'OUT019']
['OUT049' 'OUT017' 'OUT010' 'OUT027' 'OUT046' 'OUT018' 'OUT045' 'OUT019'
 'OUT013' 'OUT035']
['Medium' nan 'High' 'Small']
['Medium' nan 'Small' 'High']
['Tier 1' 'Tier 3' 'Tier 2']
['Tier 1' 'Tier 2' 'Tier 3']
['Supermarket Type1' 'Supermarket Type2' 'Grocery Store'
 'Supermarket Type3']
['Supermarket Type1' 'Grocery Store' 'Supermarket Type3'
 'Supermarket Type2']

In [56]:
train.Item_Fat_Content = train.Item_Fat_Content.replace(['low fat', 'LF'], ['Low Fat', 'Low Fat'])
test.Item_Fat_Content = test.Item_Fat_Content.replace(['low fat', 'LF'], ['Low Fat', 'Low Fat'])
train.Item_Fat_Content = train.Item_Fat_Content.replace(['reg'], ['Regular'])
test.Item_Fat_Content = test.Item_Fat_Content.replace(['reg'], ['Regular'])

In [57]:
print train.Item_Fat_Content.unique()
print test.Item_Fat_Content.unique()


['Low Fat' 'Regular']
['Low Fat' 'Regular']

In [58]:
print train.Outlet_Establishment_Year.max()
print train.Outlet_Establishment_Year.min()


2009
1985

In [77]:
# label encoding, do this by combining train and test together

test['Item_Outlet_Sales'] = 0
combi = train.append(test)
number = LabelEncoder()

for i in combi.columns:
    if (combi[i].dtype == 'object'):
        combi[i] = number.fit_transform(combi[i].astype('str'))
        combi[i] = combi[i].astype('object')
        
train = combi[:train.shape[0]]
test = combi[train.shape[0]:]

In [78]:
train.head()


Out[78]:
Item_Identifier Item_Weight Item_Fat_Content Item_Visibility Item_Type Item_MRP Outlet_Identifier Outlet_Establishment_Year Outlet_Size Outlet_Location_Type Outlet_Type Item_Outlet_Sales
0 623 9.30 0 0.016047 10 249.8092 9 18 1 0 1 3735.1380
1 1337 5.92 1 0.019278 6 48.2692 3 8 1 2 2 443.4228
2 1185 17.50 0 0.016760 2 141.6180 9 18 1 0 1 2097.2700
3 138 19.20 1 0.000000 12 182.0950 0 19 3 2 0 732.3800
4 332 8.93 0 0.000000 15 53.8614 1 30 0 2 1 994.7052

In [79]:
# Convert pandas dataframe to dask
dask_train = dd.from_pandas(train, npartitions=3)
dask_test = dd.from_pandas(test, npartitions=3)

In [80]:
dask_test.head()


Out[80]:
Item_Identifier Item_Weight Item_Fat_Content Item_Visibility Item_Type Item_MRP Outlet_Identifier Outlet_Establishment_Year Outlet_Size Outlet_Location_Type Outlet_Type Item_Outlet_Sales
0 130 20.750 0 0.007565 5 107.8622 9 18 1 0 1 0.0
1 89 8.300 1 0.038428 10 87.3198 2 10 3 1 1 0.0
2 470 14.600 0 0.099575 3 241.7538 0 19 3 2 0 0.0
3 1357 7.315 0 0.015388 5 155.0340 2 10 3 1 1 0.0
4 221 12.500 1 0.118599 10 234.2300 5 32 1 2 3 0.0

In [82]:
dask_test = dask_test.drop('Item_Outlet_Sales', axis=1)

# remove id and those with more levels
dask_train = dask_train.drop('Item_Identifier', axis=1)
dask_test = dask_test.drop('Item_Identifier', axis=1)
target = dask_train['Item_Outlet_Sales']
dask_train = dask_train.drop('Item_Outlet_Sales', axis=1)

In [88]:
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(dask_train, target,
 train_size=0.75, test_size=0.25, random_state=410)

In [90]:
import dask_ml.joblib
from sklearn.externals.joblib import parallel_backend
from dask.distributed import Client, progress

from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
import dask_searchcv as dcv

import warnings
warnings.filterwarnings("ignore")

client = Client(processes=False, threads_per_worker=4, n_workers=2, memory_limit='2GB')

In [99]:
param_grid = {
    'bootstrap': [True],
    'max_depth': [8, 9],
    'max_features': [2, 3],
    'min_samples_leaf': [4, 5],
    'min_samples_split': [8, 10],
    'n_estimators': [100, 200]
    }

rf = RandomForestRegressor()

In [ ]:
# Grid Search
grid_search = dcv.GridSearchCV(estimator = rf, param_grid = param_grid, cv = 3)

with parallel_backend('dask', scatter=[X_train, y_train]):
    grid_search.fit(X_train, y_train)

In [106]:
grid_search.best_params_


Out[106]:
{'bootstrap': True,
 'max_depth': 9,
 'max_features': 3,
 'min_samples_leaf': 5,
 'min_samples_split': 8,
 'n_estimators': 100}

In [100]:
grid_search.score(X_test, y_test)


Out[100]:
0.5832091585453303

In [101]:
grid_search.predict(dask_test)


Out[101]:
array([1787.18243787, 1405.42759025,  561.23917984, ..., 1805.65679826,
       4214.37825343, 1358.36233157])

In [103]:
# Random Search
rand_search = dcv.RandomizedSearchCV(estimator = rf, param_distributions = param_grid, cv = 3)

with parallel_backend('dask', scatter=[X_train, y_train]):
    rand_search.fit(X_train, y_train)

In [107]:
rand_search.best_params_


Out[107]:
{'bootstrap': True,
 'max_depth': 9,
 'max_features': 3,
 'min_samples_leaf': 4,
 'min_samples_split': 8,
 'n_estimators': 200}

In [104]:
rand_search.score(X_test, y_test)


Out[104]:
0.5846702341510263

In [105]:
rand_search.predict(dask_test)


Out[105]:
array([1741.97740724, 1439.17866881,  562.88985739, ..., 1798.58415215,
       4151.09236834, 1396.77421747])

Summary

  • Cannot say I'm a big fan of Dask now.
  • For data preprocessing, it's no better than using pandas and numpy, since they are much faster than dask and have more functions. I tried, it took me so much time and finally decided to change back to numpy & pandas.
  • But you can convert the preprocessed data into dask dataframe
  • The parallel processing for machine learning didn't make me feel it's much faster than sklearn.
  • Although dask-ml mentioned it supports both sklearn grid search and dask-ml grid search, but when I was using sklearn grid search, it gave large amount of error and could not tell what caused the error.
  • I think for larger dataset, Spark must be faster if its machine learning supports the methods. We can also convert pandas dataframe to Saprk dataframe to overcome the shortage of data preprocessing functions.

Reference