Dask GLM

dask-glm is a library for fitting generalized linear models on large datasets. The heart of the project is the set of optimization routines that work on either NumPy or dask arrays. See these two blogposts describing how dask-glm works internally.

This notebook is shows an example of the higher-level scikit-learn style API built on top of these optimization routines.


In [1]:
import os
import s3fs
import pandas as pd
import dask.array as da
import dask.dataframe as dd
from distributed import Client

from dask import persist, compute
from dask_glm.estimators import LogisticRegression

We'll setup a distributed.Client locally. In the real world you could connect to a cluster of dask-workers.


In [2]:
client = Client()

For demonstration, we'll use the perennial NYC taxi cab dataset. Since I'm just running things on my laptop, we'll just grab the first month's worth of data.


In [3]:
if not os.path.exists('trip.csv'):
    s3 = S3FileSystem(anon=True)
    s3.get("dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv", "trip.csv")

In [4]:
ddf = dd.read_csv("trip.csv")
ddf = ddf.repartition(npartitions=8)

I happen to know that some of the values in this dataset are suspect, so let's drop them. Scikit-learn doesn't support filtering observations inside a pipeline (yet), so we'll do this before anything else.


In [5]:
# these filter out less than 1% of the observations
ddf = ddf[(ddf.trip_distance < 20) &
          (ddf.fare_amount < 150)]
ddf = ddf.repartition(npartitions=8)

Now, we'll split our DataFrame into a train and test set, and select our feature matrix and target column (whether the passenger tipped).


In [6]:
df_train, df_test = ddf.random_split([0.8, 0.2], random_state=2)

columns = ['VendorID', 'passenger_count', 'trip_distance', 'payment_type', 'fare_amount']

X_train, y_train = df_train[columns], df_train['tip_amount'] > 0
X_test, y_test = df_test[columns], df_test['tip_amount'] > 0

X_train, y_train, X_test, y_test = persist(
    X_train, y_train, X_test, y_test
)

In [7]:
X_train.head()


Out[7]:
VendorID passenger_count trip_distance payment_type fare_amount
2 1 1 1.8 2 9.5
3 1 1 0.5 2 3.5
4 1 1 3.0 2 15.0
5 1 1 9.0 1 27.0
6 1 1 2.2 2 14.0

In [8]:
y_train.head()


Out[8]:
2    False
3    False
4    False
5     True
6    False
Name: tip_amount, dtype: bool

In [9]:
print(f"{len(X_train):,d} observations")


10,155,301 observations

With our training data in hand, we fit our logistic regression. Nothing here should be surprising to those familiar with scikit-learn.


In [10]:
%%time
# this is a *dask-glm* LogisticRegresion, not scikit-learn
lm = LogisticRegression(fit_intercept=False)
lm.fit(X_train.values, y_train.values)


CPU times: user 1min 27s, sys: 13.3 s, total: 1min 40s
Wall time: 11min 25s

Again, following the lead of scikit-learn we can measure the performance of the estimator on the training dataset using the .score method. For LogisticRegression this is the mean accuracy score (what percent of the predicted matched the actual).


In [11]:
%%time
lm.score(X_train.values, y_train.values).compute()


CPU times: user 205 ms, sys: 25 ms, total: 230 ms
Wall time: 364 ms
Out[11]:
0.88082578743850137

and on the test dataset:


In [12]:
%%time
lm.score(X_test.values, y_test.values).compute()


CPU times: user 144 ms, sys: 21.8 ms, total: 166 ms
Wall time: 249 ms
Out[12]:
0.88061000067744588

Pipelines

The bulk of my time "doing data science" is data cleaning and pre-processing. Actually fitting an estimator or making predictions is a relatively small proportion of the work.

You could manually do all your data-processing tasks as a sequence of function calls starting with the raw data. Or, you could use scikit-learn's Pipeline to accomplish this and then some. Pipelines offer a few advantages over the manual solution.

First, your entire modeling process from raw data to final output is in a self-contained object. No more wondering "did I remember to scale this version of my model?" It's there in the Pipeline for you to check.

Second, Pipelines combine well with scikit-learn's model selection utilties, specifically GridSearchCV and RandomizedSearchCV. You're able to search over hyperparameters of the pipeline stages, just like you would for an estimator.

Third, Pipelines help prevent leaking information from your test and validation sets to your training set. A common mistake is to compute some pre-processing statistic on the entire dataset (before you've train-test split) rather than just the training set. For example, you might normalize a column by the average of all the observations. These types of errors can lead you overestimate the performance of your model on new observations.

Since dask-glm follows the scikit-learn API, we can reuse scikit-learn's Pipeline machinery, with a few caveats.

Many of the tranformers built into scikit-learn will validate their inputs. As part of this, array-like things are cast to numpy arrays. Since dask-arrays are array-like they are converted and things "work", but this might not be ideal when your dataset doesn't fit in memory.

Second, some things are just fundamentally hard to do on large datasets. For example, naively dummy-encoding a dataset requires a full scan of the data to determine the set of unique values per categorical column. When your dataset fits in memory, this isn't a huge deal. But when it's scattered across a cluster, this could become a bottleneck.

If you know the set of possible values ahead of time, you can do much better. You can encode the categorical columns as pandas Categoricals, and then convert with get_dummies, without having to do an expensive full-scan, just to compute the set of unique values. We'll do that on the VendorID and payment_type columnms.


In [13]:
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline

First let's write a little transformer to convert columns to Categoricals. If you aren't familar with scikit-learn transformers, the basic idea is that the transformer must implement two methods: .fit and .tranform.

.fit is called during training. It learns something about the data and records it on self.

Then .transform uses what's learned during .fit to transform the feature matrix somehow.

A Pipeline is simply a chain of transformers, each one is fit on some data, and passes the output of .transform onto the next step; the final output is an Estimator, like LogisticRegression.


In [14]:
class CategoricalEncoder(BaseEstimator, TransformerMixin):
    """Encode `categories` as pandas `Categorical`
 
    Parameters
    ----------
    categories : Dict[str, list]
        Mapping from column name to list of possible values
    """
    def __init__(self, categories):
        self.categories = categories
        
    def fit(self, X, y=None):
        # "stateless" transformer. Don't have anything to learn here
        return self
    
    def transform(self, X, y=None):
        X = X.copy()
        for column, categories in self.categories.items():
            X[column] = X[column].astype('category').cat.set_categories(categories)
        return X

We'll also want a daskified version of scikit-learn's StandardScaler, that won't eagerly convert a dask.array to a numpy array (N.B. the scikit-learn version has more features and error handling, but this will work for now).


In [15]:
class StandardScaler(BaseEstimator, TransformerMixin):
    def __init__(self, columns=None, with_mean=True, with_std=True):
        self.columns = columns
        self.with_mean = with_mean
        self.with_std = with_std

    def fit(self, X, y=None):
        if self.columns is None:
            self.columns_ = X.columns
        else:
            self.columns_ = self.columns
        if self.with_mean:
            self.mean_ = X[self.columns_].mean(0)
        if self.with_std:
            self.scale_ = X[self.columns_].std(0)
        return self
        
    def transform(self, X, y=None):
        X = X.copy()
        if self.with_mean:
            X[self.columns_] = X[self.columns_] - self.mean_
        if self.with_std:
            X[self.columns_] = X[self.columns_] / self.scale_
        return X.values

Finally, I've written a dummy encoder transformer that converts categoricals to dummy-encoded interger columns. The full implementation is a bit long for a blog post, but you can see it here.


In [16]:
from dummy_encoder import DummyEncoder

In [17]:
pipe = make_pipeline(
    CategoricalEncoder({"VendorID": [1, 2],
                        "payment_type": [1, 2, 3, 4, 5]}),
    DummyEncoder(),
    StandardScaler(columns=['passenger_count', 'trip_distance', 'fare_amount']),
    LogisticRegression(fit_intercept=False)
)

So that's our pipeline. We can go ahead and fit it just like before, passing in the raw data.


In [18]:
%%time
pipe.fit(X_train, y_train.values)


CPU times: user 5min 24s, sys: 42.4 s, total: 6min 6s
Wall time: 37min 4s
Out[18]:
Pipeline(memory=None,
     steps=[('categoricalencoder', CategoricalEncoder(categories={'VendorID': [1, 2], 'payment_type': [1, 2, 3, 4, 5]})), ('dummyencoder', DummyEncoder(columns=None, drop_first=False)), ('standardscaler', StandardScaler(columns=['passenger_count', 'trip_distance', 'fare_amount'],
        with_mean=True, ...iter=100, over_relax=1, regularizer='l2', reltol=0.01, rho=1,
          solver='admm', tol=0.0001))])

And we can score it as well. The Pipeline ensures that all of the nescessary transformations take place before calling the estimator's score method.


In [19]:
pipe.score(X_train, y_train.values).compute()


Out[19]:
0.97890756758465358

In [20]:
pipe.score(X_test, y_test.values).compute()


Out[20]:
0.97888495550125487

As explained earlier, Pipelines and grid search go hand-in-hand. Let's run a quick example with dask-searchcv.


In [21]:
from sklearn.model_selection import GridSearchCV
import dask_searchcv as dcv

We'll search over two hyperparameters

  1. Whether or not to standardize the variance of each column in StandardScaler
  2. The strength of the regularization in LogisticRegression

This involves fitting many models, one for each combination of paramters. dask-searchcv is smart enough to know that early stages in the pipeline (like the categorical and dummy encoding) are shared among all the combinations, and so only fits them once.


In [22]:
param_grid = {
    'standardscaler__with_std': [True, False],
    'logisticregression__lamduh': [.001, .01, .1, 1],
}

pipe = make_pipeline(
    CategoricalEncoder({"VendorID": [1, 2],
                        "payment_type": [1, 2, 3, 4, 5]}),
    DummyEncoder(),
    StandardScaler(columns=['passenger_count', 'trip_distance', 'fare_amount']),
    LogisticRegression(fit_intercept=False)
)

gs = dcv.GridSearchCV(pipe, param_grid)

In [23]:
%%time
gs.fit(X_train, y_train.values)


CPU times: user 1min 5s, sys: 24 s, total: 1min 29s
Wall time: 40min 24s
Out[23]:
GridSearchCV(cache_cv=True, cv=None, error_score='raise',
       estimator=Pipeline(memory=None,
     steps=[('categoricalencoder', CategoricalEncoder(categories={'VendorID': [1, 2], 'payment_type': [1, 2, 3, 4, 5]})), ('dummyencoder', DummyEncoder(columns=None, drop_first=False)), ('standardscaler', StandardScaler(columns=['passenger_count', 'trip_distance', 'fare_amount'],
        with_mean=True, ...iter=100, over_relax=1, regularizer='l2', reltol=0.01, rho=1,
          solver='admm', tol=0.0001))]),
       get=None, iid=True,
       param_grid={'standardscaler__with_std': [True, False], 'logisticregression__lamduh': [0.001, 0.01, 0.1, 1]},
       refit=True, return_train_score=True, scoring=None)

Now we have access to the usual attributes like cv_results_ learned by the grid search object:


In [31]:
pd.DataFrame(gs.cv_results_)


Out[31]:
mean_test_score mean_train_score param_logisticregression__lamduh param_standardscaler__with_std params rank_test_score split0_test_score split0_train_score split1_test_score split1_train_score split2_test_score split2_train_score std_test_score std_train_score
0 0.946754 0.946304 0.001 True {'logisticregression__lamduh': 0.001, 'standar... 1 0.978945 0.978819 0.882557 0.881177 0.978758 0.978918 0.045394 0.046052
1 0.946754 0.946304 0.001 False {'logisticregression__lamduh': 0.001, 'standar... 1 0.978945 0.978819 0.882557 0.881177 0.978758 0.978918 0.045394 0.046052
2 0.946754 0.946304 0.01 True {'logisticregression__lamduh': 0.01, 'standard... 1 0.978945 0.978819 0.882557 0.881177 0.978758 0.978918 0.045394 0.046052
3 0.946754 0.946304 0.01 False {'logisticregression__lamduh': 0.01, 'standard... 1 0.978945 0.978819 0.882557 0.881177 0.978758 0.978918 0.045394 0.046052
4 0.946754 0.946304 0.1 True {'logisticregression__lamduh': 0.1, 'standards... 1 0.978945 0.978819 0.882557 0.881177 0.978758 0.978918 0.045394 0.046052
5 0.946754 0.946304 0.1 False {'logisticregression__lamduh': 0.1, 'standards... 1 0.978945 0.978819 0.882557 0.881177 0.978758 0.978918 0.045394 0.046052
6 0.946754 0.946304 1 True {'logisticregression__lamduh': 1, 'standardsca... 1 0.978945 0.978819 0.882557 0.881177 0.978758 0.978918 0.045394 0.046052
7 0.946754 0.946304 1 False {'logisticregression__lamduh': 1, 'standardsca... 1 0.978945 0.978819 0.882557 0.881177 0.978758 0.978918 0.045394 0.046052

And we can do our usual checks on model fit for the training set:


In [25]:
gs.score(X_train, y_train.values).compute()


Out[25]:
0.97888698720008394

And the test set:


In [26]:
gs.score(X_test, y_test.values).compute()


Out[26]:
0.97886801935289736

Hopefully your reaction to everything here is somewhere between a nodding head and a yawn. If you're familiar with scikit-learn, everything here should look pretty routine. It's the same API you know and love, scaled out to larger datasets thanks to dask-glm.