In [1]:
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.model_selection import GridSearchCV
from sklearn.naive_bayes import BernoulliNB
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import FeatureUnion
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import PolynomialFeatures
from sklearn.preprocessing import FunctionTransformer
from sklearn.datasets import make_classification
In [2]:
from dstoolbox.pipeline import PipelineY
from dstoolbox.pipeline import SliceMixin
from dstoolbox.pipeline import DictFeatureUnion
from dstoolbox.pipeline import DataFrameFeatureUnion
from dstoolbox.pipeline import TimedPipeline
from dstoolbox.transformers import ItemSelector
A variant of sklearn Pipeline that applies a transform on the target values.
In [3]:
X = np.array(['Alice', 'Bob', 'Charles', 'Dora', 'Eve'])
y = np.array(['F', 'M', 'M', 'F', 'F'])
In [4]:
pipeline = PipelineY(
steps=[('count', CountVectorizer(analyzer='char')),
('clf', BernoulliNB())],
y_transformer=LabelEncoder(),
)
In [5]:
pipeline.fit(X, y)
Out[5]:
The classes are the encoded targets:
In [6]:
pipeline.classes_
Out[6]:
In [7]:
pipeline.y_transform(y)
Out[7]:
In [8]:
pipeline.y_inverse_transform(pipeline.y_transform(y)) == y
Out[8]:
In [9]:
pipeline.predict(X)
Out[9]:
In [10]:
pipeline.predict(X, inverse=True)
Out[10]:
In [11]:
y = np.random.random(5)
In [12]:
pipeline = PipelineY(
steps=[('count', CountVectorizer(analyzer='char')),
('clf', LinearRegression())],
y_transformer=StandardScaler(with_mean=True),
)
In [13]:
gs_params = {
'count__max_features': [5, 10],
'clf__fit_intercept': [True, False],
'y_transformer__with_mean': [True, False]
}
gs = GridSearchCV(pipeline, gs_params, cv=2)
gs.fit(X, y.reshape(-1, 1))
Out[13]:
In [14]:
gs.best_params_
Out[14]:
When working extensively with Pipelines or FeatureUnions, accessing steps
and transformer_list
can become cumbersome and error prone. For instance, if another transformer is added to the transformer_list
, one has to pay attention that there are no errors when accessing it by index.
SliceMixin
is a helper mixin class that allows to extend Pipelines and FeatureUnions to allow for more comfortable slicing and indexing.
In [15]:
class SlicePipeline(Pipeline, SliceMixin):
pass
class SliceFeatureUnion(FeatureUnion, SliceMixin):
pass
In [16]:
pipeline = SlicePipeline([
('counts', SliceFeatureUnion([
('char', CountVectorizer(analyzer='char')),
('word', CountVectorizer(analyzer='word')),
])),
('tfidf', TfidfTransformer()),
('clf', BernoulliNB()),
])
In [17]:
X = np.array(['Alice', 'Bob', 'Charles', 'Dora', 'Eve'])
y = np.array([1, 0, 0, 1, 1])
In [18]:
pipeline.fit(X, y)
Out[18]:
In [19]:
pipeline['clf']
Out[19]:
In [20]:
pipeline['counts']['char']
Out[20]:
In [21]:
pipeline[1]
Out[21]:
In [22]:
pipeline[-3][1]
Out[22]:
In [23]:
pipeline[1:]
Out[23]:
Since the slice creates shallow copies, the estimators returned by the slice are still fitted. This also allows you to quickly create new pipelines:
In [24]:
try:
pipeline.transform(X)
except AttributeError:
print("The classifier does not support `transform`, therefore this does not work.")
In [25]:
only_transforms = Pipeline(pipeline[:-1])
only_transforms.transform(X)
Out[25]:
Instead of concatenating the results of a FeatureUnion
to an array, FeatureUnionDict
puts them into a dictionary. This could be useful for several applications. For instance, if all values are 1d-arrays, the dictionary can be used to instantiate a pandas
DataFrame. Furthermore, some libraries such as tensorflow
or theano
support feeding data through dictionaries.
In [26]:
X = np.array([
[0, 10],
[2, 20],
]).astype(float)
In [27]:
transformer_list = [
('scaler', StandardScaler()),
('polynomialfeatures', PolynomialFeatures()),
]
In [28]:
union = DictFeatureUnion(transformer_list)
In [29]:
Xt = union.fit_transform(X)
The keys of the transformed data, polynomialfeatures
and scaler
, correspond to the names indicated in the transformer_list
.
In [30]:
Xt
Out[30]:
In [31]:
union = DictFeatureUnion([
('nested', DictFeatureUnion(transformer_list)),
('another_scaler', StandardScaler()),
])
In [32]:
Xt = union.fit_transform(X)
Now we have 3 keys, 'another_scaler' from the outer DictFeatureUnion
; 'polynomialfeatures' and 'scaler' from the inner DictFeatureUnion
.
In [33]:
Xt.keys()
Out[33]:
This class extends the FeatureUnion
class to output pandas DataFrame
s if each transformer outputs a DataFrame
or Series
.
The index of all outputs must be the same for this to work. Use the parameter ignore_index
to reset the index of each DataFrame
before concatenating.
In [34]:
X = pd.DataFrame(data={
'names': ['Alice', 'Bob', 'Charles', 'Dora', 'Eve'],
'surnames': ['Carroll', 'Meister', 'Darwin', 'Explorer', 'Wally'],
'age': [14., 30., 55., 7., 25.]}
)
In [35]:
feature_union = DataFrameFeatureUnion(
transformer_list=[
('get-first-dataframe', Pipeline([
('select', ItemSelector('surnames')),
])
),
('get-second-dataframe', Pipeline([
('select', ItemSelector(['age', 'names'])),
])
),
], ignore_index=True, copy=False)
In [36]:
feature_union.fit_transform(X)
Out[36]:
TimedPipeline
is a modified pipeline that helps to quickly check how long each step of a pipeline takes. Getting this information by using tools such as cProfile
or line_profiler
is often tedious. TimedPipeline
is a tool that gives you quick feedback about the timing of each step. Additionally, it will show the output shape after each step.
In [37]:
X, y = make_classification()
In [38]:
import time
def add1(x):
time.sleep(0.123)
return x + 1
In [39]:
pipe = TimedPipeline([
('scale', StandardScaler()),
('poly', PolynomialFeatures()),
('pca', PCA(n_components=42)),
('plus1', FunctionTransformer(add1)),
('clf', LogisticRegression()),
])
In [40]:
pipe.fit(X, y)
Out[40]:
In [41]:
_ = pipe.predict(X)
It is possible to disable messages by calling the shed_timing
method.
In [42]:
pipe.shed_timing()
In [43]:
_ = pipe.predict(X)
To add messages again, call add_timing
.
In [44]:
pipe.add_timing()
In [45]:
_ = pipe.predict(X)
TimedPipeline
has an additional argument, sink
, that allows to change the target of the messages. The default is print
but you could, for instance, pass your logger if you want to log the messages.
In [46]:
from functools import partial
In [47]:
pipe = TimedPipeline([
('scale', StandardScaler()),
('poly', PolynomialFeatures()),
('pca', PCA(n_components=42)),
('plus1', FunctionTransformer(add1)),
('clf', LogisticRegression()),
], sink=partial(print, end=' $\n\n'))
In [48]:
pipe.fit(X, y)
Out[48]:
In [ ]: