The purpose of this notebook is to demonstrate what streaming pipelines will look like.
DataSelector(
FeatureUnion(
[('base', BasePipeline),
('stream1', StreamPipeline),
('stream2', StreamPipeline)
]
)
)
In [1]:
from sklearn.datasets import make_regression, make_classification
from sklearn.pipeline import Pipeline, make_pipeline, FeatureUnion
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.decomposition import PCA
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_selection.base import SelectorMixin
from sklearn.utils.validation import check_is_fitted
from sklearn.linear_model import SGDRegressor
import itertools
In [2]:
pdf = pd.DataFrame(np.random.normal(size=(100,100)))
In [3]:
pdf.columns = ['c{}'.format(x) for x in range(100)]
In [4]:
y = np.random.normal(size=100)
In [5]:
pdf.head()
Out[5]:
In [6]:
class DataSelector(BaseEstimator, TransformerMixin):
def __init__(self, columns=None):
self.columns = columns
def fit(self, x, y=None):
return self
def transform(self, X):
if self.columns is not None:
return X[self.columns]
else:
return X.copy()
In [7]:
testSel = DataSelector(columns=['c0', 'c1'])
In [8]:
testSel.fit_transform(pdf).head()
Out[8]:
In [9]:
# simulate what streaming features might look like
base_df = pdf[['c{}'.format(x) for x in range(20)]]
stream1 = pdf[['c{}'.format(x) for x in range(20,30)]]
stream2 = pdf[['c{}'.format(x) for x in range(30,40)]]
In [10]:
class SGDRegressorTransformer(SGDRegressor, TransformerMixin):
def __init__(self, penalty='l1'):
SGDRegressor.__init__(self, penalty=penalty)
def transform(self, X):
return self.predict(X).reshape(-1, 1)
In [11]:
def streaming_pipeline(columns):
return make_pipeline(
DataSelector(columns = columns),
SGDRegressorTransformer()
)
In [12]:
base_pipeline = streaming_pipeline(['c{}'.format(x) for x in range(20)])
stream1_pipeline = streaming_pipeline(['c{}'.format(x) for x in range(20,30)])
stream2_pipeline = streaming_pipeline(['c{}'.format(x) for x in range(30,40)])
In [13]:
# combine all the pipeline together...
# using stacking - this will be the boosting
# variant of grafting...
full_pipeline = make_pipeline(
FeatureUnion([
('base', base_pipeline),
('stream1', stream1_pipeline),
('stream2', stream2_pipeline)
]),
SGDRegressorTransformer('elasticnet'))
In [14]:
full_pipeline.fit(pdf, y)
full_pipeline.predict(pdf)
Out[14]:
In [15]:
class GraftingRegressor(BaseEstimator, TransformerMixin):
def __init__(self, lambda_=0.05):
"""
lambda_: is the regularizer penalty. This is used to select which columns are kept
in the model.
"""
self.grafting_columns = []
self.stream_pipeline = []
self.full_pipeline = []
self.lambda_ = lambda_
def streaming_pipeline(self, columns):
return make_pipeline(
DataSelector(columns = columns),
SGDRegressorTransformer()
)
def _fit(self, X, y=None):
if not type(X) is pd.core.frame.DataFrame:
raise Exception("Grafting Classifier can only accept dataframes!")
flat_columns = list(itertools.chain(*self.grafting_columns))
new_feats = [x for x in list(X.columns) if x not in flat_columns]
idx = len(self.stream_pipeline)
self.stream_pipeline.append(('stream{}'.format(idx), self.streaming_pipeline(new_feats)))
self.full_pipeline = make_pipeline(
FeatureUnion(self.stream_pipeline[:]),
SGDRegressorTransformer('elasticnet'))
def fit(self, X, y=None):
self._fit(X, y)
self.full_pipeline.fit(X, y)
return self
def partial_fit(self, X, y=None):
self._fit(X, y)
self.full_pipeline.partial_fit(X, y)
return self
def predict(self, X):
return self.full_pipeline.predict(X)
def transform(self, X):
return self.predict(X)
In [16]:
gcc = GraftingRegressor()
In [17]:
gcc.fit(pdf, y)
Out[17]:
In [18]:
gcc.transform(pdf)
Out[18]:
In [19]:
gcc.predict(pdf)
Out[19]: