In [55]:
import os
import io
import numpy as np
import random
import re
In [56]:
def unique(arr, dic=None):
if (dic is None):
dic = {}
for el in arr:
if isinstance(el, list):
unique(el, dic)
else:
if (el not in dic):
dic[el] = 1
else:
dic[el] += 1
return dic
Классификация будет происходить по след формуле: $$p(c\mid d,\lambda)=\frac {\exp\sum_i^{n \times k}{\lambda_i}f_i\left(c,d\right )} {\sum_{\tilde{c}\in C}{\exp\sum_i^{n \times k}{\lambda_i}f_i\left(\tilde{c},d\right )}}$$
In [57]:
def predict(x, weights):
probas = np.dot(x, weights)
# далее сглаживаем выходы через softmax
probas = np.exp(probas, dtype=np.float32)
return probas / probas.sum()
Задачу будем решать с помощью максимизации функции правдоподобия $$\log p(C|D,\lambda) =\sum_{(c,d)\in(C,D)}\log p(c|d,\lambda) =\sum_{(c,d)\in(C,D)}\log\frac {\exp\sum_i^{n \times k}{\lambda_i}f_i\left(c,d\right )} {\sum_{\tilde{c}\in C}{\exp\sum_i^{n \times k}{\lambda_i}f_i\left(\tilde{c},d\right )}}$$
Соответственно градиент у нас будет в частных производных
$$\frac{\partial\log p(C|D,\lambda)}{\partial\lambda_i}= \sum_{(c,d)\in(C,D)}{f_i(c,d)}- \sum_{d\in D}{\sum_{c\in C}{p(c|d,\lambda)f_i(c,d)}}$$
In [58]:
def fit(X, y, batch_size=64, nb_epoch=10, alpha=0.85, max_iter=100, C=0.1, random_state=None, verbose=1):
"""
:param X:
:param y:
:param f_count:
:param c_count:
:param batch_size:
:param nb_epoch:
:param alpha:
:param max_iter:
:param C:
:param random_state:
:param verbose:
"""
n_samples = len(X)
feature_dic = unique(X)
classes_dic = unique(y)
n_features = len(feature_dic)
n_classes = len(classes_dic)
if batch_size is None:
batch_size = n_samples
if batch_size > n_samples:
batch_size = n_samples
if random_state is not None:
random.seed(random_state)
# матрица весов индикаторов
weights = np.zeros((n_features, n_classes), dtype=np.float32)
features = np.zeros((n_features, n_classes), dtype=np.int8)
#инициализация весов
for i in range(len(X)):
for xi in X[i]:
features[xi, y[i]] = 1
weights[xi, y[i]] += 1
idx = weights > 0
weights[idx] = np.log(weights[idx])
all_iter = 0
u = 0.0
for epoch in range(nb_epoch):
if verbose:
print 'Start epoch #%d\t' % epoch,
# SGD
# ограничим сверху max_iter итерациями
loss = 0.
prev_logl = 0.
for iter_num in range(max_iter):
if verbose and (iter_num % (max_iter / 20) == 0):
print '.',
logl = 0.
ncorrect = 0
r = range(n_samples)
r = random.sample(r, batch_size)
iter_sample = 0
for i in r:
# вектор признаков(контекста)
x_vec = np.zeros(n_features, dtype=np.int8)
for j in X[i]:
x_vec[j] += 1
iter_sample += 1
y_vec = np.zeros(weights.shape[1], dtype=np.int8)
y_vec[y[i]] = 1
all_iter += 1.0
eta = alpha ** (all_iter / n_samples)
# предсказываем вероятности
probas = predict(x_vec, weights)
# смотрим, правильно ли мы предсказали, это нужно только для verbose
if np.argmax(probas) == y[i]:
ncorrect += 1
# считаем "правдоподобие"
logl += np.log(probas[y[i]])
# обновляем веса
# grad = np.outer(x_vec, probas)*features - np.outer(x_vec, y_vec )
idx = x_vec != 0
me = np.outer(x_vec[idx], probas) * features[idx]
ee = np.outer(x_vec[idx], y_vec)
weights[idx] -= (me - ee) * eta + weights[idx] * eta * C
if (iter_num > 0):
loss += (logl - prev_logl)
prev_logl = logl
if verbose:
print '\tLoss: %.8f' % (loss / max_iter)
return weights
In [59]:
# небольшой тест
X = [[0, 1],
[2, 1],
[2, 3],
[2, 1],
[0, 1],
[2, 1, 4],
[2, 3, 4],
[2, 1, 5],
[0, 3, 5],
[0, 1, 5]]
y = [0, 0, 1, 1, 0, 1, 1, 0, 0, 0]
weights = fit(X, y, random_state=241, C=0.00001, nb_epoch=1)
# print weights
# print patterns
x = np.zeros(6)
x[0] = 1
x[3] = 1
pred = predict(x, weights)
In [60]:
digits_regex = re.compile('\d')
punc_regex = re.compile('[\%\(\)\-\/\:\;\<\>\«\»\,]')
delim_regex = re.compile('([\.])\s+')
In [61]:
def read_and_tokenize(foldername):
'''
метод для считывания текстов из файлов папки
здесь применяется довольно простая токенизация
'''
word_counts = {}
tokenized_text = []
for path, subdirs, files in os.walk('data'):
for name in files:
filename = os.path.join(path, name)
with io.open(filename, 'r', encoding='utf-8') as data_file:
for line in data_file:
if len(line) < 50:
continue
text = digits_regex.sub(u'0', line.lower())
text = punc_regex.sub(u'', text)
text = delim_regex.sub(r' \1 ', text)
for word in text.split():
if not word:
continue
if word not in word_counts:
word_counts[word] = 1
else:
word_counts[word] += 1
tokenized_text.append(word)
word2index = {}
index2word = []
i = 0
filtered_text = []
for word in tokenized_text:
if word_counts[word] > 2:
if word not in word2index:
word2index[word] = i
index2word.append(word)
i += 1
filtered_text.append(word)
return filtered_text, word2index, index2word
In [62]:
def generate_train(tokenized_text, word2index,context_len = 4):
'''
метод для генерации обучающих данных
'''
X = []
y = []
for i, y_word in enumerate(tokenized_text):
x = []
for j in range(i - context_len, i):
if (j >= 0):
x_word = tokenized_text[j]
x.append(word2index[x_word])
if (len(x) > 0):
X.append(x)
y.append(word2index[y_word])
return X, y
In [63]:
tokenized_text, word2index, index2word = read_and_tokenize('data')
In [64]:
unique_words = len(index2word)
print 'all words:', len(tokenized_text)
print 'all unique words:', unique_words
In [65]:
context_len = 4
X,y = generate_train(tokenized_text, word2index,context_len=context_len)
In [66]:
weights = fit(X, y, random_state=241, verbose=1, batch_size=32, nb_epoch=3, C=0.0001)
In [67]:
test = [word2index[word] for word in [u'путин']]
last_index = index = test[-1]
print 'GENRATED TEXT:'
for i in range(20):
x = np.zeros(len(index2word))
x[test] = 1
pred = predict(x, weights)
indicies = pred.argsort()[::-1][:20]
for index in indicies:
if index in test:
continue
else:
break
last_index = int(index)
print index2word[index],
test.append(index)
if len(test) > context_len:
del test[0]