In [ ]:
In [1]:
import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
In [2]:
class RBM(object):
def __init__(self, D, M, an_id):
self.D = D
self.M = M
self.id = an_id
self.build(D, M)
def set_session(self, session):
self.session = session
def build(self, D, M):
# params
self.W = tf.Variable(tf.random_normal(shape=(D, M)) * np.sqrt(2.0 / M))
# note: without limiting variance, you get numerical stability issues
self.c = tf.Variable(np.zeros(M).astype(np.float32))
self.b = tf.Variable(np.zeros(D).astype(np.float32))
# data
self.X_in = tf.placeholder(tf.float32, shape=(None, D))
# conditional probabilities
# NOTE: tf.contrib.distributions.Bernoulli API has changed in Tensorflow v1.2
V = self.X_in
p_h_given_v = tf.nn.sigmoid(tf.matmul(V, self.W) + self.c)
self.p_h_given_v = p_h_given_v # save for later
r = tf.random_uniform(shape=tf.shape(p_h_given_v))
H = tf.to_float(r < p_h_given_v)
p_v_given_h = tf.nn.sigmoid(tf.matmul(H, tf.transpose(self.W)) + self.b)
r = tf.random_uniform(shape=tf.shape(p_v_given_h))
X_sample = tf.to_float(r < p_v_given_h)
# build the objective
objective = tf.reduce_mean(self.free_energy(self.X_in)) - tf.reduce_mean(self.free_energy(X_sample))
self.train_op = tf.train.AdamOptimizer(1e-2).minimize(objective)
# build the cost
# we won't use this to optimize the model parameters
# just to observe what happens during training
logits = self.forward_logits(self.X_in)
self.cost = tf.reduce_mean(
tf.nn.sigmoid_cross_entropy_with_logits(
labels=self.X_in,
logits=logits,
)
)
def fit(self, X, epochs = 1, batch_sz = 100, show_fig = False):
N, D = X.shape
n_batches = N // batch_sz
costs = []
for i in range(epochs):
print("Epoch: ", i)
X = shuffle(X)
for j in range(n_batches):
batch = X[j * batch_sz : (j * batch_sz + batch_sz)]
_, c = self.session.run((self.train_op, self.cost),
feed_dict = {self.X_in: batch})
if j % 10 == 0:
print("j / n_batches:", j, "/", n_batches, "cost: ", c)
costs.append(c)
if show_fig:
plt.figure(figsize = (14, 14))
plt.plot(costs)
plt.show()
def free_energy(self, V):
b = tf.reshape(self.b, (self.D, 1))
first_term = -tf.matmul(V, b)
first_term = tf.reshape(first_term, (-1,))
second_term = -tf.reduce_sum(
# tf.log(1 + tf.exp(tf.matmul(V, self.W) + self.c)),
tf.nn.softplus(tf.matmul(V, self.W) + self.c),
axis=1
)
return first_term + second_term
def forward_hidden(self, X):
return tf.nn.sigmoid(tf.matmul(X, self.W) + self.c)
def forward_logits(self, X):
Z = self.forward_hidden(X)
return tf.matmul(Z, tf.transpose(self.W)) + self.b
def forward_output(self, X):
return tf.nn.sigmoid(self.forward_logits(X))
def transform(self, X):
# accepts and returns a real numpy array
# unlike forward_hidden and forward_output
# which deal with tensorflow variables
return self.session.run(self.p_h_given_v,
feed_dict = {self.X_in: X})
In [3]:
def getKaggleMNIST():
train = pd.read_csv('./data/Section 2/train.csv').as_matrix().astype(np.float32)
train = shuffle(train)
Xtrain = train[:-1000, 1:] / 255
Ytrain = train[:-1000, 0].astype(np.int32)
Xtest = train[-1000:, 1:] / 255
Ytest = train[-1000:, 0].astype(np.int32)
return Xtrain, Ytrain, Xtest, Ytest
In [4]:
Xtrain, Ytrain, Xtest, Ytest = getKaggleMNIST()
Xtrain = Xtrain.astype(np.float32)
Xtest = Xtest.astype(np.float32)
_, D = Xtrain.shape
K = len(set(Ytrain))
In [5]:
rbm = RBM(D, 300, 0)
init_op = tf.global_variables_initializer()
with tf.Session() as session:
session.run(init_op)
rbm.set_session(session)
rbm.fit(Xtrain,
epochs = 10,
show_fig = True)