Implementation of the Online Group Feature Selection (OGFS) algorithm.
OGFS uses Lasso, so we will default to Lasso in its filtering with a low tolerance.
Note: The output of the algorithm is not to provide a model, but rather the present the group of selected (subset) of features.
In [1]:
import sklearn
In [2]:
from sklearn.datasets import make_regression, make_classification
from sklearn.linear_model import SGDRegressor
import pandas as pd
import numpy as np
import SPEC
from scipy import stats
from sklearn.metrics.pairwise import rbf_kernel
In [3]:
def similarity_within_class(X, y):
return SPEC.similarity_classification(X, y)
def similarity_between_class(X, y):
"""
Calculates betweenclass affinity X (data) y (labels)
note that it only considers the labels
"""
y_series = pd.Series(y)
y_val = y_series.value_counts(normalize=True)
n_inv = 1.0/len(set(y))
y_size = len(y)
sim_matrix = np.zeros((len(y), len(y)))
for s_i in range(y_size):
for s_j in range(y_size):
sim_matrix[s_i, s_j] = n_inv - y_val[y[s_i]] if y[s_i] == y[s_j] else n_inv
return sim_matrix
In [4]:
def convert_to_deciles(y, n=10):
"""
By default converts to deciles, can be changed based on choice of n.
"""
return np.array(pd.cut(y, n, labels=range(n)))
In [5]:
X, y = make_regression(n_features=100)
pdf = pd.DataFrame(X)
pdf.columns = ['c{}'.format(x) for x in range(X.shape[1])]
In [6]:
X.shape
Out[6]:
In [7]:
X1 = pdf[['c{}'.format(x) for x in range(50, 100)]]
X2 = pdf[['c{}'.format(x) for x in range(50)]]
In [8]:
def spec_supervised(X, y, is_classification=True):
if not is_classification:
y = convert_to_deciles(y)
W_w = similarity_within_class(X, y)
W_b = similarity_between_class(X, y)
s_w = SPEC.spec(**{'X': X, 'y': y, 'style':0, 'mode': 'raw', 'W': W_w})
s_b = SPEC.spec(**{'X': X, 'y': y, 'style':0, 'mode': 'raw', 'W': W_b})
return s_b, s_w
In [9]:
def evaluate_feats1(s_b, s_w):
curr_u1 = []
curr_u2 = []
my_feats = []
prev_score = None
for idx, x1, x2 in zip(range(len(s_b)), s_b, s_w):
if prev_score is None:
curr_u1.append(x1)
curr_u2.append(x2)
my_feats.append(idx)
else:
test_u1 = curr_u1[:]
test_u2 = curr_u2[:]
test_u1.append(x1)
test_u2.append(x2)
score = ((np.sum(test_u1)/np.sum(test_u2)) - prev_score)
if score > 0.001:
my_feats.append(idx)
curr_u1.append(x1)
curr_u2.append(x2)
prev_score = np.sum(curr_u1)/np.sum(curr_u2)
# testing first feat...
curr_u1.pop(0)
curr_u2.pop(0)
my_feats.pop(0)
test_u1 = curr_u1[:]
test_u2 = curr_u2[:]
test_u1.append(s_b[0])
test_u2.append(s_w[0])
prev_score = np.sum(curr_u1)/np.sum(curr_u2)
score = ((np.sum(test_u1)/np.sum(test_u2)) - prev_score)
if score > 0.001:
my_feats.append(0)
return my_feats
def evaluate_feats2(X, alpha=0.05, highest_best=True):
"""
X is the raw scrores
alpha is the level of significance
This version uses T-test
Returns: set of indices indicating selected features.
"""
eval_order = np.argsort(X)
if highest_best:
eval_order = eval_order[::-1]
selected_feats = []
selected_idx = []
for idx in eval_order:
if len(selected_feats) == 0:
selected_feats.append(X[idx])
selected_idx.append(idx)
continue
# now continue on and decide what to do
mu = np.mean(selected_feats)
sigma = np.std(selected_feats)
U = len(selected_feats)
if sigma == 0.0 and U > 1:
return selected_idx
elif sigma == 0.0:
selected_feats.append(X[idx])
selected_idx.append(idx)
continue
# otherwise compute score for T test.
t_stat = (mu - X[idx])/(sigma/np.sqrt(U))
t_alpha = stats.t.pdf(t_stat, U)
if t_alpha <= alpha:
selected_feats.append(X[idx])
selected_idx.append(idx)
else:
return selected_idx
return selected_idx
In [10]:
def evaluate_feats(s_b, s_w, alpha=0.05):
set1 = evaluate_feats1(s_b,s_w)
set2 = evaluate_feats2(s_b/s_w, alpha)
return list(set(set1 + set2))
In [11]:
s_b, s_w = spec_supervised(X, y, False)
evaluate_feats(s_b, s_w)
Out[11]:
In [12]:
import pandas
class OGFSRegressor(SGDRegressor):
def __init__(self, loss="squared_loss", penalty="l1", alpha=0.0001,
l1_ratio=0.15, fit_intercept=True, max_iter=None, tol=None,
shuffle=True, verbose=0, epsilon=0.1,
random_state=None, learning_rate="invscaling", eta0=0.01,
power_t=0.25, warm_start=False, average=False, n_iter=None,
intragroup_alpha=0.05, intergroup_thres=None):
super(OGFSRegressor, self).__init__(loss=loss, penalty=penalty,
alpha=alpha, l1_ratio=l1_ratio,
fit_intercept=fit_intercept,
max_iter=max_iter, tol=tol,
shuffle=shuffle,
verbose=verbose,
epsilon=epsilon,
random_state=random_state,
learning_rate=learning_rate,
eta0=eta0, power_t=power_t,
warm_start=warm_start,
average=average, n_iter=n_iter)
"""
intragroup_alpha : the alpha level of t-test used to determine significance
intergroup_thres : the threshold for lasso to remove redundancy
"""
self.filter_cols = []
self.base_shape = None
self.intragroup_alpha = intragroup_alpha
self.intergroup_thres = intergroup_thres if intergroup_thres is not None else epsilon
def _X_unseen(self, X):
"""
Method used to only select unseen features.
"""
bool_mask = np.zeros((X.shape[1],), dtype=np.bool)
bool_mask[self.coef_.shape[0]:] = True
return self._fit_X_mask(X, bool_mask, True)
def _fit_X_mask(self, X, bool_mask, return_x=True):
if not return_x:
return bool_mask
if type(X) is pandas.core.frame.DataFrame:
return X[X.columns[bool_mask]]
else:
return X[:, bool_mask]
def _fit_columns(self, X, return_x=True):
"""
Method filter through "unselected" columns. The goal of this
method is to filter any uninformative columns.
This will be selected based on index only?
If return_x is false, it will only return the boolean mask.
"""
import pandas
bool_mask = np.ones((X.shape[1],), dtype=np.bool)
if len(self.filter_cols) == 0:
if return_x:
return X
else:
return bool_mask
# otherwise...
bool_mask[self.filter_cols] = False
return self._fit_X_mask(X, bool_mask, return_x)
def _reg_penalty(self, tot_new_feats, base_size):
remove_cols = np.argwhere(np.abs(self.coef_[-tot_new_feats:]) < self.intergroup_thres)
add_cols = np.argwhere(np.abs(self.coef_[-tot_new_feats:]) >= self.intergroup_thres)
base_coef = self.coef_[:-tot_new_feats].tolist()
# adding new coefs
base_coef = base_coef + self.coef_[-tot_new_feats:][add_cols].flatten().tolist()
self.coef_ = np.array(base_coef)
remove_cols_offset = [base_size + x for x in remove_cols.flatten().tolist()]
self.filter_cols = self.filter_cols + remove_cols_offset
def _partial_ogfs_fit(self, X_, y):
"""
Partial fit online group feature selection method to
perform spectral analysis on incoming feature set
to then expand the coefficient listing
"""
# require to know the base shape to determine/
# check for irrelevant columns in the future.
base_size = len(self.filter_cols) + self.coef_.flatten().shape[0]
X = self._fit_columns(X_)
n_samples, n_features = X.shape
# extract only the new feature that has arrived.
new_X = self._X_unseen(X)
s_b, s_w = spec_supervised(np.array(new_X), y, False)
sel_feats = evaluate_feats(s_b, s_w, self.intragroup_alpha)
#sel_feats = [x+self.coef_.shape[0] for x in evaluate_feats(s_b, s_w)]
# flip sel_feats
unsel_feats = [x+base_size for x in range(new_X.shape[1]) if x not in sel_feats]
self.filter_cols = self.filter_cols + unsel_feats
# update coef_list...
coef_list = np.zeros(self.coef_.shape[0] + len(sel_feats))
coef_list[:self.coef_.shape[0]] = self.coef_.copy()
self.coef_ = coef_list.copy()
def partial_fit(self, X, y, sample_weight=None):
base_size = len(self.filter_cols) + self.coef_.shape[0]
tot_new_feats = X.shape[1] - base_size
self._partial_ogfs_fit(X, y)
X_ = self._fit_columns(X)
super(OGFSRegressor, self).partial_fit(X_, y, sample_weight=None)
# update parameters based on weight of regularizer penalty
self._reg_penalty(tot_new_feats, base_size)
return self
def predict(self, X):
X = self._fit_columns(X)
return super(OGFSRegressor, self).predict(X)
In [13]:
model = OGFSRegressor(max_iter=1000)
model.fit(X1, y)
Out[13]:
In [14]:
len(model.coef_)
Out[14]:
In [15]:
model.partial_fit(pdf, y)
Out[15]:
In [16]:
len(model.coef_)
Out[16]:
In [18]:
model.predict(pdf)
Out[18]:
In [ ]: