Using dstoolbox pipeline

Table of contents

Imports


In [1]:
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.model_selection import GridSearchCV
from sklearn.naive_bayes import BernoulliNB
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import FeatureUnion
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import PolynomialFeatures
from sklearn.preprocessing import FunctionTransformer
from sklearn.datasets import make_classification

In [2]:
from dstoolbox.pipeline import PipelineY
from dstoolbox.pipeline import SliceMixin
from dstoolbox.pipeline import DictFeatureUnion
from dstoolbox.pipeline import DataFrameFeatureUnion
from dstoolbox.pipeline import TimedPipeline
from dstoolbox.transformers import ItemSelector

PipelineY

A variant of sklearn Pipeline that applies a transform on the target values.

A simple example


In [3]:
X = np.array(['Alice', 'Bob', 'Charles', 'Dora', 'Eve'])
y = np.array(['F', 'M', 'M', 'F', 'F'])

In [4]:
pipeline = PipelineY(
    steps=[('count', CountVectorizer(analyzer='char')),
           ('clf', BernoulliNB())],
    y_transformer=LabelEncoder(),
)

In [5]:
pipeline.fit(X, y)


Out[5]:
PipelineY(steps=[('count', CountVectorizer(analyzer='char', binary=False, decode_error='strict',
        dtype=<class 'numpy.int64'>, encoding='utf-8', input='content',
        lowercase=True, max_df=1.0, max_features=None, min_df=1,
        ngram_range=(1, 1), preprocessor=None, stop_words=None,
        strip_accents=None, token_pattern='(?u)\\b\\w\\w+\\b',
        tokenizer=None, vocabulary=None)), ('clf', BernoulliNB(alpha=1.0, binarize=0.0, class_prior=None, fit_prior=True))],
     y_transformer=LabelEncoder())

The classes are the encoded targets:


In [6]:
pipeline.classes_


Out[6]:
array([0, 1])

We can apply the y_transform method to our labels


In [7]:
pipeline.y_transform(y)


Out[7]:
array([0, 1, 1, 0, 0])

If supported by the transformer, we may also use the y_inverse_transform method


In [8]:
pipeline.y_inverse_transform(pipeline.y_transform(y)) == y


Out[8]:
array([ True,  True,  True,  True,  True], dtype=bool)

The predict method has an optional inverse keyword to directly invert the predictions


In [9]:
pipeline.predict(X)


Out[9]:
array([0, 1, 1, 0, 0])

In [10]:
pipeline.predict(X, inverse=True)


Out[10]:
array(['F', 'M', 'M', 'F', 'F'], 
      dtype='<U1')

The PipelineY supports grid search and other facilities that use get_params and set_params


In [11]:
y = np.random.random(5)

In [12]:
pipeline = PipelineY(
    steps=[('count', CountVectorizer(analyzer='char')),
           ('clf', LinearRegression())],
    y_transformer=StandardScaler(with_mean=True),
)

In [13]:
gs_params = {
    'count__max_features': [5, 10],
    'clf__fit_intercept': [True, False],
    'y_transformer__with_mean': [True, False]
}
gs = GridSearchCV(pipeline, gs_params, cv=2)
gs.fit(X, y.reshape(-1, 1))


Out[13]:
GridSearchCV(cv=2, error_score='raise',
       estimator=PipelineY(steps=[('count', CountVectorizer(analyzer='char', binary=False, decode_error='strict',
        dtype=<class 'numpy.int64'>, encoding='utf-8', input='content',
        lowercase=True, max_df=1.0, max_features=None, min_df=1,
        ngram_range=(1, 1), preprocessor=None, stop_word...=1, normalize=False))],
     y_transformer=StandardScaler(copy=True, with_mean=True, with_std=True)),
       fit_params={}, iid=True, n_jobs=1,
       param_grid={'y_transformer__with_mean': [True, False], 'count__max_features': [5, 10], 'clf__fit_intercept': [True, False]},
       pre_dispatch='2*n_jobs', refit=True, return_train_score=True,
       scoring=None, verbose=0)

In [14]:
gs.best_params_


Out[14]:
{'clf__fit_intercept': True,
 'count__max_features': 10,
 'y_transformer__with_mean': False}

SliceMixin

When working extensively with Pipelines or FeatureUnions, accessing steps and transformer_list can become cumbersome and error prone. For instance, if another transformer is added to the transformer_list, one has to pay attention that there are no errors when accessing it by index.

SliceMixin is a helper mixin class that allows to extend Pipelines and FeatureUnions to allow for more comfortable slicing and indexing.

Create a new class that inherits from the mixin


In [15]:
class SlicePipeline(Pipeline, SliceMixin):
    pass

class SliceFeatureUnion(FeatureUnion, SliceMixin):
    pass

In [16]:
pipeline = SlicePipeline([
    ('counts', SliceFeatureUnion([
        ('char', CountVectorizer(analyzer='char')),
        ('word', CountVectorizer(analyzer='word')),
    ])),
    ('tfidf', TfidfTransformer()),
    ('clf', BernoulliNB()),
])

In [17]:
X = np.array(['Alice', 'Bob', 'Charles', 'Dora', 'Eve'])
y = np.array([1, 0, 0, 1, 1])

In [18]:
pipeline.fit(X, y)


Out[18]:
SlicePipeline(steps=[('counts', SliceFeatureUnion(n_jobs=1,
         transformer_list=[('char', CountVectorizer(analyzer='char', binary=False, decode_error='strict',
        dtype=<class 'numpy.int64'>, encoding='utf-8', input='content',
        lowercase=True, max_df=1.0, max_features=None, min_df=1,
        ngr...se, use_idf=True)), ('clf', BernoulliNB(alpha=1.0, binarize=0.0, class_prior=None, fit_prior=True))])

Accessing steps by name


In [19]:
pipeline['clf']


Out[19]:
BernoulliNB(alpha=1.0, binarize=0.0, class_prior=None, fit_prior=True)

In [20]:
pipeline['counts']['char']


Out[20]:
CountVectorizer(analyzer='char', binary=False, decode_error='strict',
        dtype=<class 'numpy.int64'>, encoding='utf-8', input='content',
        lowercase=True, max_df=1.0, max_features=None, min_df=1,
        ngram_range=(1, 1), preprocessor=None, stop_words=None,
        strip_accents=None, token_pattern='(?u)\\b\\w\\w+\\b',
        tokenizer=None, vocabulary=None)

Accessing steps by index


In [21]:
pipeline[1]


Out[21]:
TfidfTransformer(norm='l2', smooth_idf=True, sublinear_tf=False, use_idf=True)

In [22]:
pipeline[-3][1]


Out[22]:
CountVectorizer(analyzer='word', binary=False, decode_error='strict',
        dtype=<class 'numpy.int64'>, encoding='utf-8', input='content',
        lowercase=True, max_df=1.0, max_features=None, min_df=1,
        ngram_range=(1, 1), preprocessor=None, stop_words=None,
        strip_accents=None, token_pattern='(?u)\\b\\w\\w+\\b',
        tokenizer=None, vocabulary=None)

Accessing steps by slice


In [23]:
pipeline[1:]


Out[23]:
[('tfidf',
  TfidfTransformer(norm='l2', smooth_idf=True, sublinear_tf=False, use_idf=True)),
 ('clf',
  BernoulliNB(alpha=1.0, binarize=0.0, class_prior=None, fit_prior=True))]

Since the slice creates shallow copies, the estimators returned by the slice are still fitted. This also allows you to quickly create new pipelines:


In [24]:
try:
    pipeline.transform(X)
except AttributeError:
    print("The classifier does not support `transform`, therefore this does not work.")


The classifier does not support `transform`, therefore this does not work.

In [25]:
only_transforms = Pipeline(pipeline[:-1])
only_transforms.transform(X)


Out[25]:
<5x17 sparse matrix of type '<class 'numpy.float64'>'
	with 25 stored elements in Compressed Sparse Row format>

DictFeatureUnion

Instead of concatenating the results of a FeatureUnion to an array, FeatureUnionDict puts them into a dictionary. This could be useful for several applications. For instance, if all values are 1d-arrays, the dictionary can be used to instantiate a pandas DataFrame. Furthermore, some libraries such as tensorflow or theano support feeding data through dictionaries.

Simple example


In [26]:
X = np.array([
    [0, 10],
    [2, 20],
]).astype(float)

In [27]:
transformer_list = [
    ('scaler', StandardScaler()),
    ('polynomialfeatures', PolynomialFeatures()),
]

In [28]:
union = DictFeatureUnion(transformer_list)

In [29]:
Xt = union.fit_transform(X)

The keys of the transformed data, polynomialfeatures and scaler, correspond to the names indicated in the transformer_list.


In [30]:
Xt


Out[30]:
{'polynomialfeatures': array([[   1.,    0.,   10.,    0.,    0.,  100.],
        [   1.,    2.,   20.,    4.,   40.,  400.]]),
 'scaler': array([[-1., -1.],
        [ 1.,  1.]])}

Nested DictFeatureUnion


In [31]:
union = DictFeatureUnion([
    ('nested', DictFeatureUnion(transformer_list)),
    ('another_scaler', StandardScaler()),
])

In [32]:
Xt = union.fit_transform(X)

Now we have 3 keys, 'another_scaler' from the outer DictFeatureUnion; 'polynomialfeatures' and 'scaler' from the inner DictFeatureUnion.


In [33]:
Xt.keys()


Out[33]:
dict_keys(['polynomialfeatures', 'another_scaler', 'scaler'])

DataFrameFeatureUnion

This class extends the FeatureUnion class to output pandas DataFrames if each transformer outputs a DataFrame or Series.

The index of all outputs must be the same for this to work. Use the parameter ignore_index to reset the index of each DataFrame before concatenating.


In [34]:
X = pd.DataFrame(data={
    'names': ['Alice', 'Bob', 'Charles', 'Dora', 'Eve'],
    'surnames': ['Carroll', 'Meister', 'Darwin', 'Explorer', 'Wally'],
    'age': [14., 30., 55., 7., 25.]}
)

In [35]:
feature_union = DataFrameFeatureUnion(
    transformer_list=[
        ('get-first-dataframe', Pipeline([
                ('select', ItemSelector('surnames')),
            ])
         ),
        ('get-second-dataframe', Pipeline([
                ('select', ItemSelector(['age', 'names'])),
            ])
         ),
    ], ignore_index=True, copy=False)

In [36]:
feature_union.fit_transform(X)


Out[36]:
surnames age names
0 Carroll 14.0 Alice
1 Meister 30.0 Bob
2 Darwin 55.0 Charles
3 Explorer 7.0 Dora
4 Wally 25.0 Eve

TimedPipeline

TimedPipeline is a modified pipeline that helps to quickly check how long each step of a pipeline takes. Getting this information by using tools such as cProfile or line_profiler is often tedious. TimedPipeline is a tool that gives you quick feedback about the timing of each step. Additionally, it will show the output shape after each step.


In [37]:
X, y = make_classification()

Simple example


In [38]:
import time

def add1(x):
    time.sleep(0.123)
    return x + 1

In [39]:
pipe = TimedPipeline([
    ('scale', StandardScaler()),
    ('poly', PolynomialFeatures()),
    ('pca', PCA(n_components=42)),
    ('plus1', FunctionTransformer(add1)),
    ('clf', LogisticRegression()),
])

In [40]:
pipe.fit(X, y)


{"name": "scale"                       , "method": "fit"               , "duration":        0.000, "shape": "-"}
{"name": "scale"                       , "method": "transform"         , "duration":        0.000, "shape": "100x20"}
{"name": "scale"                       , "method": "fit_transform"     , "duration":        0.001, "shape": "100x20"}
{"name": "poly"                        , "method": "fit"               , "duration":        0.000, "shape": "-"}
{"name": "poly"                        , "method": "transform"         , "duration":        0.003, "shape": "100x231"}
{"name": "poly"                        , "method": "fit_transform"     , "duration":        0.003, "shape": "100x231"}
{"name": "pca"                         , "method": "fit_transform"     , "duration":        0.068, "shape": "100x42"}
{"name": "plus1"                       , "method": "fit"               , "duration":        0.000, "shape": "-"}
{"name": "plus1"                       , "method": "transform"         , "duration":        0.123, "shape": "100x42"}
{"name": "plus1"                       , "method": "fit_transform"     , "duration":        0.124, "shape": "100x42"}
{"name": "clf"                         , "method": "fit"               , "duration":        0.002, "shape": "-"}
Out[40]:
TimedPipeline(sink=<built-in function print>,
       steps=[('scale', StandardScaler(copy=True, with_mean=True, with_std=True)), ('poly', PolynomialFeatures(degree=2, include_bias=True, interaction_only=False)), ('pca', PCA(copy=True, iterated_power='auto', n_components=42, random_state=None,
  svd_solver='auto', tol=0.0, whiten=False)), ('plus1', Fun...ty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False))])

In [41]:
_ = pipe.predict(X)


{"name": "scale"                       , "method": "transform"         , "duration":        0.000, "shape": "100x20"}
{"name": "poly"                        , "method": "transform"         , "duration":        0.004, "shape": "100x231"}
{"name": "pca"                         , "method": "transform"         , "duration":        0.000, "shape": "100x42"}
{"name": "plus1"                       , "method": "transform"         , "duration":        0.123, "shape": "100x42"}
{"name": "clf"                         , "method": "predict"           , "duration":        0.001, "shape": "100"}

Disable messages

It is possible to disable messages by calling the shed_timing method.


In [42]:
pipe.shed_timing()

In [43]:
_ = pipe.predict(X)

To add messages again, call add_timing.


In [44]:
pipe.add_timing()

In [45]:
_ = pipe.predict(X)


{"name": "scale"                       , "method": "transform"         , "duration":        0.000, "shape": "100x20"}
{"name": "poly"                        , "method": "transform"         , "duration":        0.005, "shape": "100x231"}
{"name": "pca"                         , "method": "transform"         , "duration":        0.000, "shape": "100x42"}
{"name": "plus1"                       , "method": "transform"         , "duration":        0.123, "shape": "100x42"}
{"name": "clf"                         , "method": "predict"           , "duration":        0.000, "shape": "100"}

Change sink

TimedPipeline has an additional argument, sink, that allows to change the target of the messages. The default is print but you could, for instance, pass your logger if you want to log the messages.


In [46]:
from functools import partial

In [47]:
pipe = TimedPipeline([
    ('scale', StandardScaler()),
    ('poly', PolynomialFeatures()),
    ('pca', PCA(n_components=42)),
    ('plus1', FunctionTransformer(add1)),
    ('clf', LogisticRegression()),
], sink=partial(print, end=' $\n\n'))

In [48]:
pipe.fit(X, y)


{"name": "scale"                       , "method": "fit"               , "duration":        0.001, "shape": "-"} $

{"name": "scale"                       , "method": "transform"         , "duration":        0.000, "shape": "100x20"} $

{"name": "scale"                       , "method": "fit_transform"     , "duration":        0.001, "shape": "100x20"} $

{"name": "poly"                        , "method": "fit"               , "duration":        0.000, "shape": "-"} $

{"name": "poly"                        , "method": "transform"         , "duration":        0.003, "shape": "100x231"} $

{"name": "poly"                        , "method": "fit_transform"     , "duration":        0.004, "shape": "100x231"} $

{"name": "pca"                         , "method": "fit_transform"     , "duration":        0.006, "shape": "100x42"} $

{"name": "plus1"                       , "method": "fit"               , "duration":        0.000, "shape": "-"} $

{"name": "plus1"                       , "method": "transform"         , "duration":        0.123, "shape": "100x42"} $

{"name": "plus1"                       , "method": "fit_transform"     , "duration":        0.124, "shape": "100x42"} $

{"name": "clf"                         , "method": "fit"               , "duration":        0.002, "shape": "-"} $

Out[48]:
TimedPipeline(sink=functools.partial(<built-in function print>, end=' $\n\n'),
       steps=[('scale', StandardScaler(copy=True, with_mean=True, with_std=True)), ('poly', PolynomialFeatures(degree=2, include_bias=True, interaction_only=False)), ('pca', PCA(copy=True, iterated_power='auto', n_components=42, random_state=None,
  svd_solver='auto', tol=0.0, whiten=False)), ('plus1', Fun...ty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False))])

In [ ]: