In this implementation of alpha-investing we will use
mse and log loss as measures of likelihood for likelihood-ratio test
In [2]:
import sklearn
In [4]:
from sklearn.datasets import make_regression, make_classification
from sklearn.linear_model import SGDRegressor, SGDClassifier
from sklearn.metrics import log_loss
from scipy.stats import chi2, norm
import pandas as pd
import numpy as np
In [12]:
X, y = make_classification(n_features=100)
pdf = pd.DataFrame(X)
pdf.columns = ['c{}'.format(x) for x in range(X.shape[1])]
In [63]:
X1 = pdf[['c{}'.format(x) for x in range(50, 100)]]
X2 = pdf[['c{}'.format(x) for x in range(50)]]
In [219]:
def get_p_val(X, col, y):
# use log loss to calculate change in likelihood
col = np.array(col)
X = np.array(X)
new_X = np.hstack([X, col.reshape(-1, 1)])
mod = SGDClassifier(max_iter=1000)
nul_y = mod.fit(X, y).predict(X)
alt_y = mod.fit(new_X, y).predict(new_X)
# calculate log loss
nul_ll = log_loss(y, nul_y)
alt_ll = log_loss(y, alt_y)
return chi2.cdf(2*(nul_ll-alt_ll), 1)
In [228]:
import pandas
class AlphaInvestClassifier(SGDClassifier):
def __init__(self, loss="log", penalty='l2', alpha=0.0001, l1_ratio=0.15,
fit_intercept=True, max_iter=None, tol=None, shuffle=True,
verbose=0, epsilon=0.1, n_jobs=1,
random_state=None, learning_rate="optimal", eta0=0.0,
power_t=0.5, class_weight=None, warm_start=False,
average=False, n_iter=None,
wealth=0.5, delta_alpha=0.5):
super(AlphaInvestClassifier, self).__init__(
loss=loss, penalty=penalty, alpha=alpha, l1_ratio=l1_ratio,
fit_intercept=fit_intercept, max_iter=max_iter, tol=tol,
shuffle=shuffle, verbose=verbose, epsilon=epsilon, n_jobs=n_jobs,
random_state=random_state, learning_rate=learning_rate, eta0=eta0,
power_t=power_t, class_weight=class_weight, warm_start=warm_start,
average=average, n_iter=n_iter)
"""
intragroup_alpha : the alpha level of t-test used to determine significance
intergroup_thres : the threshold for lasso to remove redundancy
"""
self.coef_info = {'cols': [], 'coef':[], 'excluded_cols': []}
self.seen_cols = []
self.base_shape = None
self.delta_alpha = delta_alpha
self.wealth = wealth
self.temp_X = None
def add_column_exclusion(self, cols):
self.coef_info['excluded_cols'] = self.coef_info['excluded_cols'] + cols
def _alpha_sel(self, X_, y):
# need to continually refit...
new_X = X_[X_.columns.difference(self.coef_info['cols'])]
col_list = self.coef_info['cols'][:]
for col in list(new_X.columns):
# perform test with and without dataset...
if col in self.coef_info['cols']:
continue
temp_X = X_[col_list]
# evaluate this column
pval = get_p_val(temp_X, new_X[[col]], y)
alpha = self.wealth/(2*temp_X.shape[1])
if pval < alpha:
col_list.append(col)
self.wealth = self.wealth + self.delta_alpha - alpha
else:
self.wealth = self.wealth - alpha
# add and update cols...
sel_cols = list(self.coef_info['cols']) + list(col_list)
self.coef_info['excluded_cols'] = [col for col in self.seen_cols if col not in sel_cols]
def _fit_columns(self, X_, return_x=True, transform_only=False):
"""
Method filter through "unselected" columns. The goal of this
method is to filter any uninformative columns.
This will be selected based on index only?
If return_x is false, it will only return the boolean mask.
"""
X = X_[X_.columns.difference(self.coef_info['excluded_cols'])]
# order the columns correctly...
col_order = self.coef_info['cols'] + list([x for x in X.columns if x not in self.coef_info['cols']])
X = X[col_order]
return X
def _reg_penalty(self, X):
col_coef = [(col, coef) for col, coef in zip(X.columns.tolist(), self.coef_.flatten()) if np.abs(coef) >= -0.1]
self.coef_info['cols'] = [x for x, _ in col_coef]
self.coef_info['coef'] = [x for _, x in col_coef]
self.coef_info['excluded_cols'] = [x for x in self.seen_cols if x not in self.coef_info['cols']]
self.coef_ = np.array(self.coef_info['coef']).reshape(1, -1)
def fit(self, X, y, coef_init=None, intercept_init=None,
sample_weight=None):
X_ = X.copy()
self.seen_cols = list(set(self.seen_cols + X.columns.tolist()))
super(AlphaInvestClassifier, self).fit(X, y, coef_init=coef_init, intercept_init=intercept_init,
sample_weight=sample_weight)
# update params in self...
self._reg_penalty(X)
return self
def partial_fit(self, X, y, sample_weight=None):
X_ = X.copy()
self.seen_cols = list(set(self.seen_cols + X.columns.tolist()))
# TODO: alpha investing
self._alpha_sel(X, y)
X = self._fit_columns(X)
# now update coefficients
n_samples, n_features = X.shape
coef_list = np.zeros(n_features, dtype=np.float64, order="C")
coef_list[:len(self.coef_info['coef'])] = self.coef_info['coef']
self.coef_ = np.array(coef_list).reshape(1, -1)
super(AlphaInvestClassifier, self).partial_fit(X, y, sample_weight=None)
self._reg_penalty(X)
return self
def predict(self, X):
X = self._fit_columns(X, transform_only=True)
print(X.shape)
return super(AlphaInvestClassifier, self).predict(X)
In [229]:
model = AlphaInvestClassifier(max_iter=1000)
model.fit(X1, y)
Out[229]:
In [230]:
model.coef_.shape
Out[230]:
In [231]:
X.shape
Out[231]:
In [232]:
model.partial_fit(pdf, y)
Out[232]:
In [233]:
model.coef_.shape
Out[233]:
In [234]:
model.coef_
Out[234]:
In [236]:
model.predict(pdf)
Out[236]: