Theano 패키지는 GPU를 지원하는 선형 대수 심볼 컴파일러(Symbolic Linear Algebra Compiler)이다.
심볼 컴파일러란 수치적인 미분, 선형 대수 계산 뿐 아니라 symbolic expression을 통해 정의된 수식(주로 목적 함수)을 사람처럼 미분하거나 재정리하여 전체 계산에 대한 최적의 계산 경로를 찾아내는 소프트웨어를 말한다. 수치 미분을 사용한 연산에 비해 정확도나 속도가 빠르기 때문에 대용량 선형 대수 계산이나 몬테카를로 시뮬레이션, 또는 딥 러닝에 사용된다.
Theano 패키지의 또다른 특징은 GPU와 CPU에 대해 동일한 파이썬 코드를 사용할 수 있다는 점이다. 아래는 GPU를 사용한 Theano 연산 결과를 비교한 것이다.
ALU는 연산유닛. 코어가 4개면 유닛이 4개. GPU는 코어가 CPU에 비해서 많기 때문에 동시에 많은 연산이 가능하다.
GPU는 기본 구조가 일반적인 CPU와 다르기 때문에 저수준 명령어 체계가 다르므로 별도의 플랫폼과 라이브러리가 필요하다. 현재 많이 사용되는 GPU 연산용 플랫폼에는 nvidia 계열의 CUDA와 Apple, AMD, Intel 계열의 OpenCL 이 있다.
파이썬에서는 pyCUDA 패키지와 pyOpenCL 패키지를 사용할 수 있다.
이미 CUDA가 앞서 있어서 많이 쓴다.
뒤에서 c코드로 만들어서 빌드해서 쓸 수 있게끔 변경해준다. CPU, GPU에서 모두 쓰일 수 있게
Theano를 사용하기 위해서는 다음과 같은 과정을 거쳐야 한다.
Theano의 모든 변수는 심볼 변수이므로 수치 변수와 혼동이 되지 않게 별도로 정의해야 한다.
스칼라, 벡터, 행렬을 정의하기 위해 theano.tensor.T
서브패키지의 dscalar
, dvector
, dmatrix
명령을 사용하거나 이미 심볼로 정의된 변수의 연산을 통해 자동으로 정의된다.
In [1]:
import theano
import theano.tensor as T #벡터에서 차원 늘어나면 metrix. 거기서 차원 늘어나면 tensor라고 한다.
from theano import function
In [3]:
x = T.dscalar('x') #d는 double. float 타입
y = T.dscalar('y')
In [4]:
type(x), type(y)
Out[4]:
이미 만들어진 심볼 변수에 일반 사칙연산이나 Theano 수학 함수를 사용하여 종속 변수 역할을 하는 심볼 변수를 만들 수 있다.
In [5]:
z = x + y
In [6]:
type(z)
Out[6]:
In [7]:
u = T.exp(z)
In [8]:
type(u)
Out[8]:
Theano의 심볼 변수의 내용을 살펴보기 위해서는 theano.printing.pprint
명령 또는 theano.printing.pydotprint
을 사용한다.
In [10]:
theano.printing.pprint(x)
Out[10]:
In [11]:
theano.printing.pprint(y)
Out[11]:
In [12]:
theano.printing.pprint(z)
Out[12]:
In [13]:
theano.printing.pprint(u)
Out[13]:
In [14]:
from IPython.display import SVG
In [15]:
SVG(theano.printing.pydotprint(z, return_image=True, format='svg'))
Out[15]:
In [17]:
SVG(theano.printing.pydotprint(u, return_image=True, format='svg'))
Out[17]:
심볼 함수는 theano.function
명령으로 정의하며 입력 심볼 변수와 출력 심볼 변수를 지정한다. 출력 심볼 변수는 입력 심볼 변수의 연산으로 정의되어 있어야 한다.
처음 심볼 함수를 정의할 때는 내부적으로 컴파일을 하기 때문에 시간이 다소 걸릴 수 있다.
In [18]:
%time f = theano.function(inputs=[x, y], outputs=z)
함수의 값을 계산하려면 일반 함수와 같이 사용하면 된다.
In [19]:
f(2, 3)
Out[19]:
벡터와 행렬도 마찬가지 방법으로 사용한다.
In [20]:
x2 = T.dvector('x2')
y2 = T.dvector('y2')
z2 = x2 + y2
f2 = function([x2, y2], z2)
f2([1, 2], [3, 4])
Out[20]:
In [21]:
x3 = T.dmatrix('x3')
y3= T.dmatrix('y3')
z3 = x3 + y3
f3 = function([x3, y3], z3)
f3([[1], [2]], [[3], [4]])
Out[21]:
로지스틱 함수나 난수를 사용하는 함수는 다음과 같이 정의한다.
In [22]:
s = 1 / (1 + T.exp(-x))
logistic = function([x], s)
logistic(1)
Out[22]:
In [23]:
s2 = 1 / (1 + T.exp(-x2))
logistic2 = function([x2], s2)
logistic2([0, 1])
Out[23]:
함수에서 디폴트 인수는 다음과 같이 In
명령을 사용해서 정의한다.
In [24]:
from theano import In
x, y = T.dscalars('x', 'y')
z = x + y
f = theano.function([x, In(y, value=3)], z)
f(1)
Out[24]:
난수 발생도 theano 의 RandomStreams
명령을 사용해야 한다.
In [26]:
from theano.tensor.shared_randomstreams import RandomStreams
srng = RandomStreams()
rv_u = srng.uniform((1,2))
rv_n = srng.normal((1,2))
f = function([], rv_u)
g = function([], rv_n, no_default_updates=True) #Not updating rv_n.rng
In [27]:
f(), f(), f(), f()
Out[27]:
In [28]:
g(), g(), g(), g()
Out[28]:
상태값을 저장할 때는 shared memory 를 사용할 수 있다. shared memory는 GPU 내부 메모리를 사용하므로 연산 속도가 향상된다.
상태값을 변경하기 위해서는 함수에 updates
라는 콜백(callback)을 정의해야 한다. givens
라는 콜백을 정의하면 함수내에서 출력까지 한번에 정의할 수도 있다.
shared memory 에 저장된 값은 get_value
메서드로 읽는다.
위에 CPU, GPU 설명하는 그림에서 GPU 칩안에 메모리가 들어있는 부분. vmemory. 바깥으로 CPU로 왔다갔다 안해도 되기 때문에 속도가 빠르다. v메모리 안에 보유하고 있기 때문에.
In [29]:
state = theano.shared(0) #0으로 한 것은 초기화
inc = T.iscalar('inc')
out = T.iscalar('out')
accumulator = function(inputs=[inc], outputs=out, givens={out: inc * 2}, updates={state: state + inc})
In [30]:
accumulator(1)
Out[30]:
In [31]:
state.get_value()
Out[31]:
In [32]:
accumulator(2)
Out[32]:
In [33]:
state.get_value()
Out[33]:
In [34]:
accumulator(5)
Out[34]:
In [35]:
state.get_value()
Out[35]:
Theano는 함수 계산을 위한 최적화를 지원한다. 예를 들어 다음과 같은 함수는 제곱 연산을 사용하여 최적화 할 수 있다.
In [36]:
x = T.vector('x')
y = x ** 10
f = theano.function([x], y)
In [37]:
SVG(theano.printing.pydotprint(f, return_image=True, format='svg'))
Out[37]:
함수 실행 속도를 비교해 보면 다음과 같다.
In [38]:
x = np.ones(10000000)
In [39]:
%timeit x ** 10
In [40]:
%timeit f(x)
심볼릭 연산의 가장 큰 장점은 빠르고 정확하게 미분값(gradient, Hessian 등)을 계산할 수 있다는 점이다.
In [41]:
x = T.dscalar('x')
y = x ** 2
gy = T.grad(y, x)
fy = theano.function([x], y)
fgy = theano.function([x], gy)
In [42]:
SVG(theano.printing.pydotprint(fy.maker.fgraph.outputs[0], return_image=True, format='svg'))
Out[42]:
In [43]:
SVG(theano.printing.pydotprint(fgy.maker.fgraph.outputs[0], return_image=True, format='svg'))
Out[43]:
In [44]:
x = T.dscalar('x') #시그모이드의 사례. 자주 사용하기 때문에 룰이 들어가 있다.
s = 1 / (1 + T.exp(-x))
logistic = theano.function([x], s)
gs = T.grad(s, x)
dlogistic = theano.function([x], gs)
In [45]:
SVG(theano.printing.pydotprint(logistic, return_image=True, format='svg'))
Out[45]:
In [46]:
SVG(theano.printing.pydotprint(dlogistic.maker.fgraph.outputs[0], return_image=True, format='svg'))
Out[46]:
In [47]:
xx = np.linspace(-5, 5, 100)
y1 = np.hstack([logistic(xi) for xi in xx])
y2 = np.hstack([dlogistic(xi) for xi in xx])
plt.plot(xx, y1, label="logistic")
plt.plot(xx, y2, label="derivative of logistic")
plt.legend(loc=0)
plt.show()
Theano를 사용하면 다음과 같이 퍼셉트론을 구현할 수 있다.
입력, 가중치, 상수항을 각각 x
, w
, b
로 정의하고 출력 함수를 f
로 정의한다.
In [48]:
x = T.dvector('x')
w = theano.shared(np.zeros(2))
b = theano.shared(0.0)
z = T.dot(w, x) + b
a = 1/(1 + T.exp(-z))
f = function([x], a)
출력을 y
, 목적 함수를 cost
, 목적 함수의 미분(그레디언트)을 gradient
로 정의한다.
In [49]:
y = T.dscalar('y')
cost = T.sum((y - a)**2) #목적함수 정의. 에러펑션을 줄이기 위해서
gw, gb = T.grad(cost, [w, b])
gradient = function([x, y], [gw, gb])
초기값에서 그레디언트 값을 계산하고 이를 이용하여 가중치를 갱신한다.
In [50]:
eta = 0.1
xi = [1, -1]
yi = 1
gradient = theano.function([x, y], updates=[(w, w - eta * gw), (b, b - eta * gb)])
gradient(xi, yi)
w.get_value(), b.get_value()
Out[50]:
그 전에는 바이너리. 1아니면 0이었는데. 0,1,2,3,4,5,6 이렇게 여러개 멀티클래스 분류 문제로 넘어간다.
Theano를 사용하여 로지스틱 회귀를 구현하면 다음과 같다. 다음 웹사이트를 참조하였다.
이 모형은 Softmax 함수를 사용하여 멀티 클래스 출력을 구현하였다.
$$ \begin{eqnarray} P(Y=i \mid x, W,b) &= \text{softmax}_i(W x + b) = \frac {e^{W_i x + b_i}} {\sum_j e^{W_j x + b_j}} \end{eqnarray} $$$$ \begin{eqnarray} y_{pred} = {\rm argmax}_i P(Y=i \mid x,W,b) \end{eqnarray} $$Teano에서의 핵심 코드는 def negative_log_likelihood(self, y): 에서 return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y]) 이것. 이거 점수가 낮아야 좋은 것이기 때문이다. 리니어리그레션은 y값이 명시되어야 하지만 여기는 명시 되지 않아도 된다.
In [52]:
class LogisticRegression(object):
"""Multi-class Logistic Regression Class
The logistic regression is fully described by a weight matrix :math:`W`
and bias vector :math:`b`. Classification is done by projecting data
points onto a set of hyperplanes, the distance to which is used to
determine a class membership probability.
"""
def __init__(self, input, n_in, n_out):
""" Initialize the parameters of the logistic regression
:type input: theano.tensor.TensorType
:param input: symbolic variable that describes the input of the architecture (one minibatch)
:type n_in: int
:param n_in: number of input units, the dimension of the space in which the datapoints lie
:type n_out: int
:param n_out: number of output units, the dimension of the space in which the labels lie
"""
# initialize with 0 the weights W as a matrix of shape (n_in, n_out)
self.W = theano.shared(value=np.zeros((n_in, n_out), dtype=theano.config.floatX), name='W', borrow=True)
# initialize the biases b as a vector of n_out 0s
self.b = theano.shared(value=np.zeros((n_out,), dtype=theano.config.floatX), name='b', borrow=True)
# symbolic expression for computing the matrix of class-membership probabilities Where:
# W is a matrix where column-k represent the separation hyperplane for class-k
# x is a matrix where row-j represents input training sample-j
# b is a vector where element-k represent the free parameter of hyperplane-k
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b) #노트 필기 참고
# symbolic description of how to compute prediction as class whose probability is maximal
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
# parameters of the model
self.params = [self.W, self.b]
# keep track of model input
self.input = input
def negative_log_likelihood(self, y):
"""Return the mean of the negative log-likelihood of the prediction of this model under a given target distribution.
:type y: theano.tensor.TensorType
:param y: corresponds to a vector that gives for each example the correct label
Note: we use the mean instead of the sum so that the learning rate is less dependent on the batch size
"""
# y.shape[0] is (symbolically) the number of rows in y, i.e., number of examples (call it n) in the minibatch
# T.arange(y.shape[0]) is a symbolic vector which will contain [0,1,2,... n-1]
# T.log(self.p_y_given_x) is a matrix of Log-Probabilities (call it LP) with one row per example and one column per class
# LP[T.arange(y.shape[0]),y] is a vector v containing [LP[0,y[0]], LP[1,y[1]], LP[2,y[2]], ...,
# LP[n-1,y[n-1]]] and T.mean(LP[T.arange(y.shape[0]),y]) is the mean (across minibatch examples) of the elements in v,
# i.e., the mean log-likelihood across the minibatch.
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y]) #y는 원핫인코딩 되어 들어가서 0100, 1000 이런식으로 들어간다.
#metrix로 들어간 것이 아니라 y가 3이라면 3번째 것만 뽑겠다.
#이 구현에서는 샘플을 모두 다 넣는다. 미니 배치 사이즈. 28X28= 500. 10개만 나오게 된다. summation을 해주어야 한다.
def errors(self, y):
"""Return a float representing the number of errors in the minibatch
over the total number of examples of the minibatch ; zero one loss over the size of the minibatch
:type y: theano.tensor.TensorType
:param y: corresponds to a vector that gives for each example the correct label
"""
# check if y has same dimension of y_pred
if y.ndim != self.y_pred.ndim:
raise TypeError('y should have the same shape as self.y_pred', ('y', y.type, 'y_pred', self.y_pred.type))
# check if y is of the correct datatype
if y.dtype.startswith('int'):
# the T.neq operator returns a vector of 0s and 1s, where 1 represents a mistake in prediction
return T.mean(T.neq(self.y_pred, y))
else:
raise NotImplementedError()
이 모형의 가중치를 찾기 위한 SGD 알고리즘은 다음과 같이 구현한다.
In [53]:
import timeit
def sgd_optimization_mnist(learning_rate=0.13, n_epochs=1000, dataset='mnist.pkl.gz', batch_size=600):
"""
Demonstrate stochastic gradient descent optimization of a log-linear model
This is demonstrated on MNIST.
:type learning_rate: float
:param learning_rate: learning rate used (factor for the stochastic gradient)
:type n_epochs: int
:param n_epochs: maximal number of epochs to run the optimizer
:type dataset: string
:param dataset: the path of the MNIST dataset file from http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz
"""
datasets = load_data(dataset)
train_set_x, train_set_y = datasets[0]
valid_set_x, valid_set_y = datasets[1]
test_set_x, test_set_y = datasets[2]
# compute number of minibatches for training, validation and testing
n_train_batches = train_set_x.get_value(borrow=True).shape[0] // batch_size
n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] // batch_size
n_test_batches = test_set_x.get_value(borrow=True).shape[0] // batch_size
######################
# BUILD ACTUAL MODEL #
######################
print('... building the model')
# allocate symbolic variables for the data
index = T.lscalar() # index to a [mini]batch
# generate symbolic variables for input (x and y represent a minibatch)
x = T.matrix('x') # data, presented as rasterized images
y = T.ivector('y') # labels, presented as 1D vector of [int] labels
# construct the logistic regression class
# Each MNIST image has size 28*28
classifier = LogisticRegression(input=x, n_in=28 * 28, n_out=10)
# the cost we minimize during training is the negative log likelihood of the model in symbolic format
cost = classifier.negative_log_likelihood(y)
# compiling a Theano function that computes the mistakes that are made by the model on a minibatch
test_model = theano.function(
inputs=[index],
outputs=classifier.errors(y),
givens={
x: test_set_x[index * batch_size: (index + 1) * batch_size],
y: test_set_y[index * batch_size: (index + 1) * batch_size]
}
)
validate_model = theano.function(
inputs=[index],
outputs=classifier.errors(y),
givens={
x: valid_set_x[index * batch_size: (index + 1) * batch_size],
y: valid_set_y[index * batch_size: (index + 1) * batch_size]
}
)
# compute the gradient of cost with respect to theta = (W,b)
g_W = T.grad(cost=cost, wrt=classifier.W)
g_b = T.grad(cost=cost, wrt=classifier.b)
# start-snippet-3
# specify how to update the parameters of the model as a list of
# (variable, update expression) pairs.
updates = [(classifier.W, classifier.W - learning_rate * g_W),
(classifier.b, classifier.b - learning_rate * g_b)]
# compiling a Theano function `train_model` that returns the cost, but in
# the same time updates the parameter of the model based on the rules defined in `updates`
train_model = theano.function(
inputs=[index],
outputs=cost,
updates=updates,
givens={
x: train_set_x[index * batch_size: (index + 1) * batch_size],
y: train_set_y[index * batch_size: (index + 1) * batch_size]
}
)
# end-snippet-3
###############
# TRAIN MODEL #
###############
print('... training the model')
# early-stopping parameters
patience = 5000 # look as this many examples regardless
patience_increase = 2 # wait this much longer when a new best is found
improvement_threshold = 0.995 # a relative improvement of this much is considered significant #멈추게 하는 값
validation_frequency = min(n_train_batches, patience // 2)
# go through this many minibatche before checking the network
# on the validation set; in this case we check every epoch
best_validation_loss = np.inf
test_score = 0.
start_time = timeit.default_timer()
done_looping = False
epoch = 0
while (epoch < n_epochs) and (not done_looping):
epoch = epoch + 1
for minibatch_index in range(n_train_batches):
minibatch_avg_cost = train_model(minibatch_index)
# iteration number
iter = (epoch - 1) * n_train_batches + minibatch_index
if (iter + 1) % validation_frequency == 0:
# compute zero-one loss on validation set
validation_losses = [validate_model(i) for i in range(n_valid_batches)]
this_validation_loss = np.mean(validation_losses)
print(
'epoch %2i, minibatch %i/%i, validation error %12.4f %%' %
(
epoch,
minibatch_index + 1,
n_train_batches,
this_validation_loss * 100.
)
)
# if we got the best validation score until now
if this_validation_loss < best_validation_loss:
#improve patience if loss improvement is good enough
if this_validation_loss < best_validation_loss * improvement_threshold:
patience = max(patience, iter * patience_increase)
best_validation_loss = this_validation_loss
# test it on the test set
test_losses = [test_model(i) for i in range(n_test_batches)]
test_score = np.mean(test_losses)
# save the best model
with open('best_model.pkl', 'wb') as f:
pickle.dump(classifier, f)
if patience <= iter:
done_looping = True
break
end_time = timeit.default_timer()
print(
(
'Optimization complete with best validation score of %f %%,'
'with test performance %f %%'
)
% (best_validation_loss * 100., test_score * 100.)
)
print('The code run for %d epochs, with %f epochs/sec' % (epoch, 1. * epoch / (end_time - start_time)))
In [54]:
import six.moves.cPickle as pickle
import gzip
import os
import sys
def load_data(dataset):
''' Loads the dataset
:type dataset: string
:param dataset: the path to the dataset (here MNIST)
'''
#############
# LOAD DATA #
#############
# Download the MNIST dataset if it is not present
data_dir, data_file = os.path.split(dataset)
if (not os.path.isfile(dataset)) and data_file == 'mnist.pkl.gz':
from six.moves import urllib
origin = (
'http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz'
)
print('Downloading data from %s' % origin)
urllib.request.urlretrieve(origin, dataset)
print('... loading data')
# Load the dataset
with gzip.open(dataset, 'rb') as f:
try:
train_set, valid_set, test_set = pickle.load(f, encoding='latin1')
except:
train_set, valid_set, test_set = pickle.load(f)
# train_set, valid_set, test_set format: tuple(input, target)
# input is a np.ndarray of 2 dimensions (a matrix)
# where each row corresponds to an example. target is a
# np.ndarray of 1 dimension (vector) that has the same length as
# the number of rows in the input. It should give the target
# to the example with the same index in the input.
def shared_dataset(data_xy, borrow=True):
""" Function that loads the dataset into shared variables
The reason we store our dataset in shared variables is to allow
Theano to copy it into the GPU memory (when code is run on GPU).
Since copying data into the GPU is slow, copying a minibatch everytime
is needed (the default behaviour if the data is not in a shared
variable) would lead to a large decrease in performance.
"""
data_x, data_y = data_xy
shared_x = theano.shared(np.asarray(data_x, dtype=theano.config.floatX), borrow=borrow)
shared_y = theano.shared(np.asarray(data_y, dtype=theano.config.floatX), borrow=borrow)
# When storing data on the GPU it has to be stored as floats
# therefore we will store the labels as ``floatX`` as well
# (``shared_y`` does exactly that). But during our computations
# we need them as ints (we use labels as index, and if they are
# floats it doesn't make sense) therefore instead of returning
# ``shared_y`` we will have to cast it to int. This little hack
# lets ous get around this issue
return shared_x, T.cast(shared_y, 'int32')
test_set_x, test_set_y = shared_dataset(test_set)
valid_set_x, valid_set_y = shared_dataset(valid_set)
train_set_x, train_set_y = shared_dataset(train_set)
rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y), (test_set_x, test_set_y)]
return rval
In [64]:
%time sgd_optimization_mnist()