The purpose of this notebook is to demonstrate what streaming pipelines will look like.

DataSelector(
    FeatureUnion(
        [('base', BasePipeline), 
         ('stream1', StreamPipeline),
         ('stream2', StreamPipeline)
        ]
    )
)

In [1]:
from sklearn.datasets import make_regression, make_classification

from sklearn.pipeline import Pipeline, make_pipeline, FeatureUnion
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.decomposition import PCA

import numpy as np
import pandas as pd

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_selection.base import SelectorMixin
from sklearn.utils.validation import check_is_fitted

from sklearn.linear_model import SGDRegressor

import itertools

In [2]:
pdf = pd.DataFrame(np.random.normal(size=(100,100)))

In [3]:
pdf.columns = ['c{}'.format(x) for x in range(100)]

In [4]:
y = np.random.normal(size=100)

In [5]:
pdf.head()


Out[5]:
c0 c1 c2 c3 c4 c5 c6 c7 c8 c9 ... c90 c91 c92 c93 c94 c95 c96 c97 c98 c99
0 -0.081009 0.446641 1.753018 1.573603 0.271781 1.173958 0.860765 -1.126751 -1.797735 -0.688237 ... -0.226435 1.106320 -0.152478 1.674564 -1.004144 -0.375211 0.541072 -0.514438 -0.845416 -0.003805
1 -0.867454 -0.567044 -2.172709 0.549284 -1.044569 -0.085124 0.218162 -1.201013 -0.039023 0.724099 ... 2.152225 -0.316049 0.516507 0.015346 -0.681834 -0.572671 -0.598991 -0.304668 1.754217 -1.029740
2 -0.137167 -0.570255 -0.191417 0.445955 1.166362 -1.054386 -0.007278 0.460846 -2.738251 1.299032 ... -0.637977 0.388052 2.619541 -0.891907 -0.242087 0.276049 -0.925730 -0.067757 -0.235863 1.636044
3 -0.657558 1.800762 -1.042025 0.324521 -0.454903 0.371031 -1.107835 -2.119823 -0.137015 0.274050 ... -0.536900 -1.168740 1.275998 -0.509346 -0.264405 1.863733 -0.451234 0.450466 0.237420 -0.163271
4 -0.853415 1.366949 0.885289 -0.540979 -0.569812 0.115609 -0.223273 -1.169540 -1.379653 0.237232 ... 1.195078 0.118953 0.268569 -0.205080 -1.276802 0.123475 -1.339757 -1.699879 1.694811 0.798126

5 rows × 100 columns


In [6]:
class DataSelector(BaseEstimator, TransformerMixin):
    def __init__(self, columns=None):
        self.columns = columns
    
    def fit(self, x, y=None):
        return self

    def transform(self, X):
        if self.columns is not None:
            return X[self.columns]
        else:
            return X.copy()

In [7]:
testSel = DataSelector(columns=['c0', 'c1'])

In [8]:
testSel.fit_transform(pdf).head()


Out[8]:
c0 c1
0 -0.081009 0.446641
1 -0.867454 -0.567044
2 -0.137167 -0.570255
3 -0.657558 1.800762
4 -0.853415 1.366949

In [9]:
# simulate what streaming features might look like
base_df = pdf[['c{}'.format(x) for x in range(20)]]
stream1 = pdf[['c{}'.format(x) for x in range(20,30)]]
stream2 = pdf[['c{}'.format(x) for x in range(30,40)]]

In [10]:
class SGDRegressorTransformer(SGDRegressor, TransformerMixin):
    def __init__(self, penalty='l1'):
        SGDRegressor.__init__(self, penalty=penalty)
        
    def transform(self, X):
        return self.predict(X).reshape(-1, 1)

In [11]:
def streaming_pipeline(columns):
    return make_pipeline(
        DataSelector(columns = columns), 
        SGDRegressorTransformer()
    )

In [12]:
base_pipeline = streaming_pipeline(['c{}'.format(x) for x in range(20)])
stream1_pipeline = streaming_pipeline(['c{}'.format(x) for x in range(20,30)])
stream2_pipeline = streaming_pipeline(['c{}'.format(x) for x in range(30,40)])

In [13]:
# combine all the pipeline together...
# using stacking - this will be the boosting
# variant of grafting...
full_pipeline = make_pipeline(
    FeatureUnion([
        ('base', base_pipeline), 
        ('stream1', stream1_pipeline), 
        ('stream2', stream2_pipeline)
    ]), 
    SGDRegressorTransformer('elasticnet'))

In [14]:
full_pipeline.fit(pdf, y)
full_pipeline.predict(pdf)


Out[14]:
array([-0.10700842, -0.03062437, -0.02090286, -0.11336955, -0.15977816,
       -0.12579174, -0.08741142, -0.12134747, -0.04462746, -0.08296429,
       -0.03304037, -0.11740999, -0.10431545, -0.00037746, -0.14182735,
       -0.05118834, -0.08439879, -0.0629687 , -0.10683876, -0.05377854,
        0.00510214, -0.05260897, -0.06858216, -0.07814564, -0.09182128,
       -0.12353198, -0.12371988, -0.03552614, -0.04843085, -0.05347862,
       -0.09092167, -0.13127768, -0.13939877, -0.09217763, -0.09870898,
       -0.05749069, -0.14396359, -0.03037639, -0.10313411, -0.05340791,
       -0.08215666, -0.05038099, -0.10502642, -0.04993068, -0.11425534,
       -0.05122696, -0.18006223, -0.09279828, -0.0775214 , -0.19063154,
       -0.03115275, -0.10557639, -0.06163757, -0.07014581, -0.05115169,
       -0.06922402, -0.11620319, -0.13705242, -0.09855643, -0.13782169,
       -0.13481644, -0.01276539, -0.01060413, -0.1534875 , -0.0790885 ,
       -0.03677558, -0.11595928, -0.10471266, -0.03096234, -0.09928369,
       -0.10694513, -0.10810034, -0.07622746, -0.11870449, -0.07252212,
       -0.0661332 , -0.07701265, -0.0367088 , -0.12791096, -0.00382798,
       -0.11613093, -0.12541921, -0.0347415 , -0.01630019, -0.17278762,
       -0.10297765, -0.10904487, -0.09807611, -0.07464294, -0.0345069 ,
       -0.09359759, -0.07271633, -0.11673103, -0.11174294, -0.08528878,
       -0.14424038, -0.08232457, -0.05912558, -0.09210196, -0.10475424])

In [15]:
class GraftingRegressor(BaseEstimator, TransformerMixin):
    def __init__(self, lambda_=0.05):
        """
        lambda_: is the regularizer penalty. This is used to select which columns are kept
                 in the model.
        """
        self.grafting_columns = []
        self.stream_pipeline = []
        self.full_pipeline = []
        self.lambda_ = lambda_
        
    def streaming_pipeline(self, columns):
        return make_pipeline(
            DataSelector(columns = columns), 
            SGDRegressorTransformer()
        )
    
    def _fit(self, X, y=None):
        if not type(X) is pd.core.frame.DataFrame:
            raise Exception("Grafting Classifier can only accept dataframes!")
        
        flat_columns = list(itertools.chain(*self.grafting_columns))
        new_feats = [x for x in list(X.columns) if x not in flat_columns]
        
        idx = len(self.stream_pipeline)
        self.stream_pipeline.append(('stream{}'.format(idx), self.streaming_pipeline(new_feats)))
        self.full_pipeline = make_pipeline(
            FeatureUnion(self.stream_pipeline[:]), 
            SGDRegressorTransformer('elasticnet'))
        
    
    def fit(self, X, y=None):
        self._fit(X, y)
        self.full_pipeline.fit(X, y)
        return self
    
    def partial_fit(self, X, y=None):
        self._fit(X, y)
        self.full_pipeline.partial_fit(X, y)
        return self
    
    def predict(self, X):
        return self.full_pipeline.predict(X)

    def transform(self, X):
        return self.predict(X)

In [16]:
gcc  = GraftingRegressor()

In [17]:
gcc.fit(pdf, y)


Out[17]:
GraftingRegressor(lambda_=0.05)

In [18]:
gcc.transform(pdf)


Out[18]:
array([-0.39380592,  0.23180973,  0.49707636,  0.14014472,  0.01280641,
       -0.31580541, -0.05606834, -0.47831911,  0.10068441, -0.25821049,
        0.05498184, -0.38126252, -0.16214793, -0.08358622, -0.54967117,
        0.15199634, -0.51491051,  0.06813817, -0.33611079, -0.01782047,
        0.1293091 ,  0.01802911, -0.35382979,  0.08904158, -0.00257713,
       -0.20318506,  0.08491686, -0.03881853,  0.43664507, -0.11114208,
       -0.4792771 ,  0.00387224, -0.29778018, -0.50782561,  0.12219898,
       -0.15904784,  0.12917408,  0.19068617, -0.14757517, -0.01550723,
       -0.32793189,  0.1438364 ,  0.13095491,  0.22965644,  0.08485319,
        0.04804499, -0.55323421, -0.14434505, -0.0039075 , -0.58844475,
        0.22705779, -0.25241946,  0.12399556,  0.25459338,  0.0884384 ,
       -0.17107297,  0.0454575 , -0.56554495, -0.38401559, -0.46473955,
       -0.31219383,  0.6066835 ,  0.22040682, -0.62362999, -0.33836777,
       -0.08132287, -0.17515207, -0.07986534,  0.12086997, -0.55432681,
       -0.24731357, -0.12879931, -0.06098167, -0.02401296, -0.19831804,
        0.34540792, -0.07710855,  0.16854416, -0.41983162,  0.42431118,
        0.02274323, -0.13554789,  0.37031156,  0.42805729, -0.34694716,
       -0.33771769,  0.03705967, -0.24202825,  0.0198698 ,  0.47909002,
        0.22378048,  0.1396452 , -0.3496178 , -0.18495287,  0.07043831,
       -0.52957455, -0.28206289, -0.5591581 , -0.47264485, -0.27816834])

In [19]:
gcc.predict(pdf)


Out[19]:
array([-0.39380592,  0.23180973,  0.49707636,  0.14014472,  0.01280641,
       -0.31580541, -0.05606834, -0.47831911,  0.10068441, -0.25821049,
        0.05498184, -0.38126252, -0.16214793, -0.08358622, -0.54967117,
        0.15199634, -0.51491051,  0.06813817, -0.33611079, -0.01782047,
        0.1293091 ,  0.01802911, -0.35382979,  0.08904158, -0.00257713,
       -0.20318506,  0.08491686, -0.03881853,  0.43664507, -0.11114208,
       -0.4792771 ,  0.00387224, -0.29778018, -0.50782561,  0.12219898,
       -0.15904784,  0.12917408,  0.19068617, -0.14757517, -0.01550723,
       -0.32793189,  0.1438364 ,  0.13095491,  0.22965644,  0.08485319,
        0.04804499, -0.55323421, -0.14434505, -0.0039075 , -0.58844475,
        0.22705779, -0.25241946,  0.12399556,  0.25459338,  0.0884384 ,
       -0.17107297,  0.0454575 , -0.56554495, -0.38401559, -0.46473955,
       -0.31219383,  0.6066835 ,  0.22040682, -0.62362999, -0.33836777,
       -0.08132287, -0.17515207, -0.07986534,  0.12086997, -0.55432681,
       -0.24731357, -0.12879931, -0.06098167, -0.02401296, -0.19831804,
        0.34540792, -0.07710855,  0.16854416, -0.41983162,  0.42431118,
        0.02274323, -0.13554789,  0.37031156,  0.42805729, -0.34694716,
       -0.33771769,  0.03705967, -0.24202825,  0.0198698 ,  0.47909002,
        0.22378048,  0.1396452 , -0.3496178 , -0.18495287,  0.07043831,
       -0.52957455, -0.28206289, -0.5591581 , -0.47264485, -0.27816834])