In [1]:
from __future__ import division, print_function
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.preprocessing import OneHotEncoder
import matplotlib.pyplot as plt
import numpy as np
import os
import torch
from torch.autograd import Variable
%matplotlib inline
In [2]:
DATA_DIR = "../../data"
TRAIN_FILE = os.path.join(DATA_DIR, "mnist_train.csv")
TEST_FILE = os.path.join(DATA_DIR, "mnist_test.csv")
MODEL_FILE = os.path.join(DATA_DIR, "torch-mnist-rnn-{:d}.model")
LEARNING_RATE = 1e-3
ROWS, COLS = 28, 28
BATCH_SIZE = 128
NUM_CLASSES = 10
NUM_EPOCHS = 5
Torch LSTMs expect their data as 3D tensors of shape (SEQUENCE_LENGTH, BATCH_SIZE, EMBEDDING_SIZE), according to this page. We do the data extraction below so our data is parsed out the same way.
In [3]:
def parse_file(filename):
xdata, ydata = [], []
fin = open(filename, "rb")
i = 0
for line in fin:
if i % 10000 == 0:
print("{:s}: {:d} lines read".format(
os.path.basename(filename), i))
cols = line.strip().split(",")
ydata.append(int(cols[0]))
xdata.append(np.reshape(
np.array([float(x) / 255. for x in cols[1:]]),
(COLS, ROWS)))
i += 1
fin.close()
print("{:s}: {:d} lines read".format(os.path.basename(filename), i))
X = np.transpose(np.array(xdata), (1, 0, 2))
y = np.array(ydata)
return X, y
Xtrain, ytrain = parse_file(TRAIN_FILE)
Xtest, ytest = parse_file(TEST_FILE)
print(Xtrain.shape, ytrain.shape, Xtest.shape, ytest.shape)
In [4]:
def datagen(X, y, batch_size=BATCH_SIZE, num_classes=NUM_CLASSES):
ohe = OneHotEncoder(n_values=num_classes)
while True:
shuffled_indices = np.random.permutation(np.arange(len(y)))
num_batches = len(y) // batch_size
for bid in range(num_batches):
batch_indices = shuffled_indices[bid*batch_size:(bid+1)*batch_size]
Xbatch = np.zeros((X.shape[0], batch_size, X.shape[2]))
Ybatch = np.zeros((batch_size, num_classes))
for i in range(batch_size):
Xbatch[:, i, :] = X[:, batch_indices[i], :]
Ybatch[i] = ohe.fit_transform(y[batch_indices[i]]).todense()
yield Xbatch, Ybatch
self_test_gen = datagen(Xtrain, ytrain)
Xbatch, Ybatch = self_test_gen.next()
print(Xbatch.shape, Ybatch.shape)
In [5]:
class MNISTClassifier(torch.nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim,
seqlen, batch_size):
super(MNISTClassifier, self).__init__()
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.seqlen = seqlen
self.batch_size = batch_size
self.output_dim = output_dim
# define layers
self.lstm = torch.nn.LSTM(input_dim, hidden_dim)
self.fcn = torch.nn.Linear(hidden_dim, output_dim)
# define weights for LSTM
self.hidden = self.init_hidden()
def init_hidden(self):
return (
Variable(torch.randn(self.input_dim, self.batch_size,
self.hidden_dim)),
Variable(torch.randn(self.input_dim, self.batch_size,
self.hidden_dim)))
def forward(self, x):
# LSTM
lstm_out, self.hidden = self.lstm(x.view(self.input_dim,
self.batch_size,
self.seqlen),
self.hidden)
# get the context vector (last element of lstm_out)
lstm_out = lstm_out[-1]
# dropout
lstm_dropout = torch.nn.Dropout(p=0.2)(lstm_out)
# FCN
fcn_out = self.fcn(lstm_dropout.view(self.batch_size,
self.hidden_dim))
return torch.nn.functional.log_softmax(fcn_out)
In [6]:
model = MNISTClassifier(ROWS, 512, NUM_CLASSES, COLS, BATCH_SIZE)
# loss_fn = torch.nn.NLLLoss()
# loss_fn = torch.nn.CrossEntropyLoss()
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
In [7]:
def compute_accuracy(pred_var, true_var):
ypred = np.argmax(pred_var.data.numpy(), axis=1)
ytrue = np.argmax(true_var.data.numpy(), axis=1)
return accuracy_score(ypred, ytrue)
history = []
train_gen = datagen(Xtrain, ytrain, BATCH_SIZE)
for epoch in range(NUM_EPOCHS):
num_batches = Xtrain.shape[1] // BATCH_SIZE
total_loss, total_acc = 0., 0.
for _ in range(num_batches):
# PyTorch accumulates gradients, so they need to be cleared
# Hidden state needs to be cleared as well before each sequence
model.zero_grad()
model.hidden = model.init_hidden()
# Get next batch, convert to Torch datatype
Xbatch, Ybatch = train_gen.next()
Xbatch = Variable(torch.from_numpy(Xbatch).float())
Ybatch = Variable(torch.from_numpy(Ybatch).float())
# forward
Ybatch_ = model(Xbatch)
# compute loss
loss = loss_fn(Ybatch_, Ybatch)
# zero parameter gradients
loss.backward()
total_loss += loss.data[0]
total_acc += compute_accuracy(Ybatch_, Ybatch)
optimizer.step()
total_loss /= num_batches
total_acc /= num_batches
torch.save(model, MODEL_FILE.format(epoch+1))
print("Epoch {:d}/{:d}: loss={:.3f}, accuracy={:.3f}".format(
(epoch+1), NUM_EPOCHS, total_loss, total_acc))
history.append((total_loss, total_acc))
In [8]:
losses = [x[0] for x in history]
accs = [x[1] for x in history]
plt.subplot(211)
plt.title("Accuracy")
plt.plot(accs)
plt.subplot(212)
plt.title("Loss")
plt.plot(losses)
plt.tight_layout()
plt.show()
In [18]:
BEST_MODEL_FILE = os.path.join(DATA_DIR, "torch-mnist-rnn-5.model")
model = torch.load(BEST_MODEL_FILE)
ys_, ys = [], []
test_gen = datagen(Xtest, ytest, BATCH_SIZE)
num_batches = Xtest.shape[1] // BATCH_SIZE
for _ in range(num_batches):
Xbatch, Ybatch = test_gen.next()
Xbatch = Variable(torch.from_numpy(Xbatch).float())
Ybatch = Variable(torch.from_numpy(Ybatch).float())
Ybatch_ = model(Xbatch)
ys_.extend(np.argmax(Ybatch_.data.numpy(), axis=1))
ys.extend(np.argmax(Ybatch.data.numpy(), axis=1))
acc = accuracy_score(ys_, ys)
cm = confusion_matrix(ys_, ys)
print("Accuracy: {:.4f}".format(acc))
print("Confusion Matrix")
print(cm)
In [ ]: