dask-glm
is a library for fitting generalized linear models on large datasets.
The heart of the project is the set of optimization routines that work on either NumPy or dask arrays.
See these two blogposts describing how dask-glm works internally.
This notebook is shows an example of the higher-level scikit-learn style API built on top of these optimization routines.
In [1]:
import os
import s3fs
import pandas as pd
import dask.array as da
import dask.dataframe as dd
from distributed import Client
from dask import persist, compute
from dask_glm.estimators import LogisticRegression
We'll setup a distributed.Client
locally. In the real world you could connect to a cluster of dask-workers.
In [2]:
client = Client()
For demonstration, we'll use the perennial NYC taxi cab dataset. Since I'm just running things on my laptop, we'll just grab the first month's worth of data.
In [3]:
if not os.path.exists('trip.csv'):
s3 = S3FileSystem(anon=True)
s3.get("dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv", "trip.csv")
In [4]:
ddf = dd.read_csv("trip.csv")
ddf = ddf.repartition(npartitions=8)
I happen to know that some of the values in this dataset are suspect, so let's drop them. Scikit-learn doesn't support filtering observations inside a pipeline (yet), so we'll do this before anything else.
In [5]:
# these filter out less than 1% of the observations
ddf = ddf[(ddf.trip_distance < 20) &
(ddf.fare_amount < 150)]
ddf = ddf.repartition(npartitions=8)
Now, we'll split our DataFrame into a train and test set, and select our feature matrix and target column (whether the passenger tipped).
In [6]:
df_train, df_test = ddf.random_split([0.8, 0.2], random_state=2)
columns = ['VendorID', 'passenger_count', 'trip_distance', 'payment_type', 'fare_amount']
X_train, y_train = df_train[columns], df_train['tip_amount'] > 0
X_test, y_test = df_test[columns], df_test['tip_amount'] > 0
X_train, y_train, X_test, y_test = persist(
X_train, y_train, X_test, y_test
)
In [7]:
X_train.head()
Out[7]:
In [8]:
y_train.head()
Out[8]:
In [9]:
print(f"{len(X_train):,d} observations")
With our training data in hand, we fit our logistic regression.
Nothing here should be surprising to those familiar with scikit-learn
.
In [10]:
%%time
# this is a *dask-glm* LogisticRegresion, not scikit-learn
lm = LogisticRegression(fit_intercept=False)
lm.fit(X_train.values, y_train.values)
Again, following the lead of scikit-learn we can measure the performance of the estimator on the training dataset using the .score
method.
For LogisticRegression this is the mean accuracy score (what percent of the predicted matched the actual).
In [11]:
%%time
lm.score(X_train.values, y_train.values).compute()
Out[11]:
and on the test dataset:
In [12]:
%%time
lm.score(X_test.values, y_test.values).compute()
Out[12]:
The bulk of my time "doing data science" is data cleaning and pre-processing. Actually fitting an estimator or making predictions is a relatively small proportion of the work.
You could manually do all your data-processing tasks as a sequence of function calls starting with the raw data.
Or, you could use scikit-learn's Pipeline
to accomplish this and then some.
Pipeline
s offer a few advantages over the manual solution.
First, your entire modeling process from raw data to final output is in a self-contained object. No more wondering "did I remember to scale this version of my model?" It's there in the Pipeline
for you to check.
Second, Pipeline
s combine well with scikit-learn's model selection utilties, specifically GridSearchCV
and RandomizedSearchCV
. You're able to search over hyperparameters of the pipeline stages, just like you would for an estimator.
Third, Pipeline
s help prevent leaking information from your test and validation sets to your training set.
A common mistake is to compute some pre-processing statistic on the entire dataset (before you've train-test split) rather than just the training set. For example, you might normalize a column by the average of all the observations.
These types of errors can lead you overestimate the performance of your model on new observations.
Since dask-glm follows the scikit-learn API, we can reuse scikit-learn's Pipeline
machinery, with a few caveats.
Many of the tranformers built into scikit-learn will validate their inputs. As part of this, array-like things are cast to numpy arrays. Since dask-arrays are array-like they are converted and things "work", but this might not be ideal when your dataset doesn't fit in memory.
Second, some things are just fundamentally hard to do on large datasets. For example, naively dummy-encoding a dataset requires a full scan of the data to determine the set of unique values per categorical column. When your dataset fits in memory, this isn't a huge deal. But when it's scattered across a cluster, this could become a bottleneck.
If you know the set of possible values ahead of time, you can do much better.
You can encode the categorical columns as pandas Categoricals
, and then convert with get_dummies
, without having to do an expensive full-scan, just to compute the set of unique values.
We'll do that on the VendorID
and payment_type
columnms.
In [13]:
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
First let's write a little transformer to convert columns to Categoricals
.
If you aren't familar with scikit-learn transformers, the basic idea is that the transformer must implement two methods: .fit
and .tranform
.
.fit
is called during training.
It learns something about the data and records it on self
.
Then .transform
uses what's learned during .fit
to transform the feature matrix somehow.
A Pipeline
is simply a chain of transformers, each one is fit
on some data, and passes the output of .transform
onto the next step; the final output is an Estimator
, like LogisticRegression
.
In [14]:
class CategoricalEncoder(BaseEstimator, TransformerMixin):
"""Encode `categories` as pandas `Categorical`
Parameters
----------
categories : Dict[str, list]
Mapping from column name to list of possible values
"""
def __init__(self, categories):
self.categories = categories
def fit(self, X, y=None):
# "stateless" transformer. Don't have anything to learn here
return self
def transform(self, X, y=None):
X = X.copy()
for column, categories in self.categories.items():
X[column] = X[column].astype('category').cat.set_categories(categories)
return X
We'll also want a daskified version of scikit-learn's StandardScaler
, that won't eagerly
convert a dask.array to a numpy array (N.B. the scikit-learn version has more features and error handling, but this will work for now).
In [15]:
class StandardScaler(BaseEstimator, TransformerMixin):
def __init__(self, columns=None, with_mean=True, with_std=True):
self.columns = columns
self.with_mean = with_mean
self.with_std = with_std
def fit(self, X, y=None):
if self.columns is None:
self.columns_ = X.columns
else:
self.columns_ = self.columns
if self.with_mean:
self.mean_ = X[self.columns_].mean(0)
if self.with_std:
self.scale_ = X[self.columns_].std(0)
return self
def transform(self, X, y=None):
X = X.copy()
if self.with_mean:
X[self.columns_] = X[self.columns_] - self.mean_
if self.with_std:
X[self.columns_] = X[self.columns_] / self.scale_
return X.values
Finally, I've written a dummy encoder transformer that converts categoricals to dummy-encoded interger columns. The full implementation is a bit long for a blog post, but you can see it here.
In [16]:
from dummy_encoder import DummyEncoder
In [17]:
pipe = make_pipeline(
CategoricalEncoder({"VendorID": [1, 2],
"payment_type": [1, 2, 3, 4, 5]}),
DummyEncoder(),
StandardScaler(columns=['passenger_count', 'trip_distance', 'fare_amount']),
LogisticRegression(fit_intercept=False)
)
So that's our pipeline. We can go ahead and fit it just like before, passing in the raw data.
In [18]:
%%time
pipe.fit(X_train, y_train.values)
Out[18]:
And we can score it as well. The Pipeline
ensures that all of the nescessary transformations take place before calling the estimator's score
method.
In [19]:
pipe.score(X_train, y_train.values).compute()
Out[19]:
In [20]:
pipe.score(X_test, y_test.values).compute()
Out[20]:
As explained earlier, Pipelines and grid search go hand-in-hand. Let's run a quick example with dask-searchcv.
In [21]:
from sklearn.model_selection import GridSearchCV
import dask_searchcv as dcv
We'll search over two hyperparameters
StandardScaler
LogisticRegression
This involves fitting many models, one for each combination of paramters. dask-searchcv is smart enough to know that early stages in the pipeline (like the categorical and dummy encoding) are shared among all the combinations, and so only fits them once.
In [22]:
param_grid = {
'standardscaler__with_std': [True, False],
'logisticregression__lamduh': [.001, .01, .1, 1],
}
pipe = make_pipeline(
CategoricalEncoder({"VendorID": [1, 2],
"payment_type": [1, 2, 3, 4, 5]}),
DummyEncoder(),
StandardScaler(columns=['passenger_count', 'trip_distance', 'fare_amount']),
LogisticRegression(fit_intercept=False)
)
gs = dcv.GridSearchCV(pipe, param_grid)
In [23]:
%%time
gs.fit(X_train, y_train.values)
Out[23]:
Now we have access to the usual attributes like cv_results_
learned by the grid search object:
In [31]:
pd.DataFrame(gs.cv_results_)
Out[31]:
And we can do our usual checks on model fit for the training set:
In [25]:
gs.score(X_train, y_train.values).compute()
Out[25]:
And the test set:
In [26]:
gs.score(X_test, y_test.values).compute()
Out[26]:
Hopefully your reaction to everything here is somewhere between a nodding head and a yawn. If you're familiar with scikit-learn, everything here should look pretty routine. It's the same API you know and love, scaled out to larger datasets thanks to dask-glm.