In [1]:
from imports import *
import import_data
from sklearn.linear_model import SGDClassifier
%matplotlib inline
In [2]:
tickers = [filename[:-4] for filename in os.listdir('quandl_data')]
binarize = True
stock_df, prediction_df = import_data.import_data(tickers, binarize=binarize)
#print stock_df.shape
y = stock_df['label'].values
y = y.reshape(y.shape[0], 1)
stock_df = stock_df.drop('label', axis=1)
X = stock_df.values
print X.shape, y.shape
stock_df.tail()
Out[2]:
In [3]:
plt.title('y values')
plt.hist(y, bins=100)
plt.show()
In [4]:
vectorize_label = False
if vectorize_label == True:
new_y = np.zeros((y.shape[0],2))
positives = []
for i in xrange(y.shape[0]):
if y[i] == 0:
new_y[i] = np.array([[1, 0]])
elif y[i] == 1:
new_y[i] = np.array([[0, 1]])
positives.append(i)
y = new_y
print y.shape
In [5]:
get_subset = False
if get_subset == True:
indices = np.random.choice(X.shape[0], 10000)
X = X[indices,:]
y = y[indices, :]
print X.shape, y.shape
In [6]:
class NN(object):
def __init__(self, Lambda=0):
# hyperparameters
self.input_layer_size = 10
self.hidden_layer_size = 9
#self.hidden_layer1_size = 10
#self.hidden_layer2_size = 2
self.output_layer_size = 1
# weights
self.W1 = np.random.randn(self.input_layer_size, self.hidden_layer_size)
self.W2 = np.random.randn(self.hidden_layer_size, self.output_layer_size)
#self.W1 = np.random.randn(self.input_layer_size, self.hidden_layer1_size)
#self.W2 = np.random.randn(self.hidden_layer1_size, self.hidden_layer2_size)
#self.W3 = np.random.randn(self.hidden_layer2_size, self.output_layer_size)
# regularization
self.Lambda = Lambda
def get_weights(self):
return np.concatenate((self.W1.ravel(), self.W2.ravel()))
#return np.concatenate((self.W1.ravel(), self.W2.ravel(), self.W3.ravel()))
def set_weights(self, weights):
W1_start = 0
W1_end = (self.hidden_layer_size * self.input_layer_size)
self.W1 = np.reshape(weights[W1_start:W1_end], (self.input_layer_size,self.hidden_layer_size))
#W1_end = (self.hidden_layer1_size * self.input_layer_size)
#self.W1 = np.reshape(weights[W1_start:W1_end], (self.input_layer_size,self.hidden_layer1_size))
W2_end = W1_end + (self.hidden_layer_size * self.output_layer_size)
self.W2 = np.reshape(weights[W1_end:W2_end], (self.hidden_layer_size,self.output_layer_size))
#W2_end = W1_end + (self.hidden_layer2_size * self.hidden_layer1_size)
#self.W2 = np.reshape(weights[W1_end:W2_end], (self.hidden_layer1_size,self.hidden_layer2_size))
#W3_end = W2_end + (self.hidden_layer2_size * self.output_layer_size)
#self.W3 = np.reshape(weights[W2_end:W3_end], (self.hidden_layer2_size,self.output_layer_size))
def forward_propagate(self, X):
self.z2 = np.dot(X, self.W1)
self.a2 = self.activation(self.z2)
self.z3 = np.dot(self.a2, self.W2)
y_hat = self.activation(self.z3)
#self.a3 = self.activation(self.z3)
#self.z4 = np.dot(self.a3, self.W3)
#y_hat = self.activation(self.z4)
return y_hat
def activation(self, z):
#return np.true_divide(1, (1 + np.exp(-z))) # sigmoid
return np.log(1 + np.exp(z)) # softplus
def visualize_activation(self):
inputs = np.arange(-6,6,0.01)
plt.plot(inputs, self.activation(inputs))
plt.show()
def activation_prime(self, z):
#return np.true_divide(np.exp(-z), ((1 + np.exp(-z))**2)) # sigmoid derivative
return np.true_divide(1, (1 + np.exp(-z))) # softplus derivative (i.e., sigmoid!)
def visualize_activation_prime(self):
inputs = np.arange(-6,6,0.01)
plt.plot(inputs, self.activation(inputs))
plt.plot(inputs, self.activation_prime(inputs))
plt.show()
def cost(self, X, y):
self.y_hat = self.forward_propagate(X)
left = 0.5 * np.sum((y - self.y_hat)**2)/X.shape[0]
right = (self.Lambda/2.0)*(np.sum(self.W1**2) + np.sum(self.W2**2))
#right = (self.Lambda/2.0)*(np.sum(self.W1**2) + np.sum(self.W2**2) + np.sum(self.W3**2))
return left + right
def cost_prime(self, X, y):
self.y_hat = self.forward_propagate(X)
#delta4 = np.multiply(-(y - self.y_hat), self.activation_prime(self.z4))
#dJdW3 = np.dot(self.a3.T, delta4)/X.shape[0] + (self.Lambda*self.W3)
#delta3 = np.dot(delta4, self.W3.T) * self.activation_prime(self.z3)
delta3 = np.multiply(-(y - self.y_hat), self.activation_prime(self.z3))
dJdW2 = np.dot(self.a2.T, delta3)/X.shape[0] + (self.Lambda*self.W2)
delta2 = np.dot(delta3, self.W2.T) * self.activation_prime(self.z2)
dJdW1 = np.dot(X.T, delta2)/X.shape[0] + (self.Lambda*self.W1)
#return dJdW1, dJdW2, dJdW3
return dJdW1, dJdW2
def compute_gradient(self, X, y):
#dJdW1, dJdW2, dJdW3 = self.cost_prime(X, y)
dJdW1, dJdW2 = self.cost_prime(X, y)
#return np.concatenate((dJdW1.ravel(), dJdW2.ravel(), dJdW3.ravel()))
return np.concatenate((dJdW1.ravel(), dJdW2.ravel()))
In [7]:
def test_activation():
nn = NN()
nn.visualize_activation_prime()
#test_activation()
In [8]:
def testNN():
nn = NN()
y_hat = nn.forward_propagate(X)
print y_hat, "\n"
print y, "\n"
cost1 = nn.cost(X,y)
print cost1, "\n"
dJdW1, dJdW2 = nn.cost_prime(X,y)
print dJdW1, "\n"
print dJdW2
#testNN()
In [9]:
def estimate_gradient(nn_test, nn_X, nn_test_y):
weights = nn_test.get_weights()
estimated_gradient = np.zeros(weights.shape)
perturb = np.zeros(weights.shape)
epsilon = 1e-4
for i in xrange(len(weights)):
perturb[i] = epsilon
nn_test.set_weights(weights + perturb)
loss2 = nn_test.cost(nn_test_X, nn_test_y)
nn_test.set_weights(weights - perturb)
loss1 = nn_test.cost(nn_test_X, nn_test_y)
estimated_gradient[i] = (loss2 - loss1) / (2 * epsilon)
perturb[i] = 0
nn_test.set_weights(weights)
return estimated_gradient
In [10]:
def test_gradient_estimation(nn_test, nn_test_X, nn_test_y):
nn_test_X = StandardScaler().fit_transform(nn_test_X)
nn_test_X = np.hstack((np.ones((nn_test_X.shape[0], 1)), nn_test_X))
estimated_gradient = estimate_gradient(nn_test, nn_test_X, nn_test_y)
gradient = nn_test.compute_gradient(nn_test_X, nn_test_y)
print "\n\t--- Gradient Checking ---"
print estimated_gradient[:3]
print gradient[:3], "\n"
print "\tdiff:", np.linalg.norm(gradient-estimated_gradient)/np.linalg.norm(gradient+estimated_gradient)
print "\t-------------------------"
test_grad_est = False
if test_grad_est == True:
test_gradient_estimation(nn_test, X, y)
In [11]:
class trainer(object):
def __init__(self, N):
self.N = N
def callback(self, weights):
self.N.set_weights(weights)
self.J.append(self.N.cost(self.X_train, self.y_train))
self.test_J.append(self.N.cost(self.X_test, self.y_test))
def cost_wrapper(self, weights, X, y):
self.N.set_weights(weights)
c = self.N.cost(X, y)
g = self.N.compute_gradient(X,y)
return c, g
def set_scale(self, X):
self.scaler = StandardScaler().fit(X)
def get_scale(self):
return self.scaler
def add_bias(self, X):
return np.hstack((np.ones((X.shape[0], 1)), X))
def trainBFGS(self, X, y):
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
self.set_scale(X_train)
scaler = self.get_scale()
X_train = scaler.transform(X_train)
self.X_train = self.add_bias(X_train)
self.y_train = y_train # StandardScaler().fit_transform(y_train)
self.X_test = scaler.transform(X_test)
self.X_test = self.add_bias(X_test)
self.y_test = y_test # StandardScaler().fit_transform(y_test)
self.J = []
self.test_J = []
weights0 = self.N.get_weights()
options = {'maxiter':500, 'disp':True}
_res = optimize.minimize(self.cost_wrapper, weights0, jac=True, method='BFGS', args=(self.X_train,self.y_train), options=options, callback=self.callback)
self.N.set_weights(_res.x)
self.optimization_results = _res
def trainSGD(self, X, y):
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
self.set_scale(X_train)
scaler = self.get_scale()
X_train = scaler.transform(X_train)
self.X_train = self.add_bias(X_train)
self.y_train = y_train # StandardScaler().fit_transform(y_train)
self.X_test = scaler.transform(X_test)
self.X_test = self.add_bias(X_test)
self.y_test = y_test # StandardScaler().fit_transform(y_test)
self.J = []
self.test_J = []
clf = SGDClassifier()
clf.fit(X_train, y_train.ravel())
self.N.set_weights(clf.coef_)
In [12]:
time0 = time()
nn = NN(Lambda=0.0001) # 0.0001
Trainer = trainer(nn)
Trainer.trainBFGS(X,y)
#Trainer.trainSGD(X,y)
test_grad_est = False
if test_grad_est == True:
test_gradient_estimation(nn, X, y)
print "\ntime for training:", np.round((time() - time0),2), "seconds"
plt.plot(Trainer.J)
plt.plot(Trainer.test_J)
plt.legend(['J', 'test_J'])
plt.show()
In [13]:
pred_df = prediction_df[prediction_df['label'].apply(np.isnan) == True]
pred_tickers = pred_df['ticker'].unique()
for i in xrange(pred_X.shape[0]):
scaler = Trainer.get_scale()
x = scaler.transform(pred_X[i:i+1,0:9])
x = Trainer.add_bias(x)
y_hat = nn.forward_propagate(x)
print str(i).rjust(2), str(pred_tickers[i]).rjust(4), np.round(y_hat,2)