In [3]:
import tensorflow as tf
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
import scipy
from corpus import Corpus
import numpy as np
In [4]:
corp_path='/home/velkey/corp/webkorpusz.wpl'
corp=Corpus(corpus_path=corp_path,language="Hun",size=1000,encoding_len=10)
In [5]:
all_features=corp.featurize_data_charlevel_onehot(corp.hun_lower)
train=all_features[0:int(len(all_features)*0.8)]
test=all_features[int(len(all_features)*0.8):len(all_features)]
In [6]:
x_train = train.reshape((len(train), np.prod(train.shape[1:])))
x_test = test.reshape((len(test), np.prod(test.shape[1:])))
print(x_train.shape)
In [7]:
import random
import matplotlib.pyplot as plt
class Experiment:
def __init__(self,x_train,x_test,y_train,y_test,layer_intervals,encoder_index,optimizer,lossmethod,step_size=0):
self.layernum=len(layer_intervals)
self.layer_intervals=layer_intervals
self.encoder_index=encoder_index
self.optimizer=optimizer
self.lossmethod=loss
self.tried_list=[]
self.train_losses=[]
self.test_losses=[]
self.x_train=x_train
self.y_train=y_train
self.train_len=len(x_train)
self.test_len=len(x_test)
self.x_test=x_test
self.y_test=y_test
self.data_dim=x_train[0].shape[0]*x_train[0].shape[1]
def gen_model(self,layer_data,type):
"""
@layer_data: [[size,activation],[size,activation]] with the last layer
"""
def run(self):
"""
"""
def show_words(predict_base,num=30):
encoded_text=encoder.predict(predict_base)
decoded_text = decoder.predict(encoded_text)
for i in range(num):
x=random.randint(0,len(predict_base)-1)
print("original:\t",corp.defeaturize_data_charlevel_onehot([predict_base[x].reshape(10,36)]),\
"\tdecoded:\t",corp.defeaturize_data_charlevel_onehot([decoded_text[x].reshape(10,36)]))
def plot_words_as_img():
encoded_imgs=encoder.predict(x_train)
decoded_imgs = decoder.predict(encoded_imgs)
n = 6 # how many digits we will display
plt.figure(figsize=(21, 4))
for i in range(n):
# display original
ax = plt.subplot(2, n, i + 1)
plt.imshow(x_test[i].reshape(10, 36))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display reconstruction
ax = plt.subplot(2, n, i + 1 + n)
plt.imshow(decoded_imgs[i].reshape(10,36))
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
In [8]:
def xavier_init(fan_in, fan_out, constant = 1):
low = -constant * np.sqrt(6.0 / (fan_in + fan_out))
high = constant * np.sqrt(6.0 / (fan_in + fan_out))
return tf.random_uniform((fan_in, fan_out),
minval = low, maxval = high,
dtype = tf.float32)
In [11]:
class Autoencoder_ffnn():
def __init__(self, featurelen,length,layerlist,encode_index,optimizer = tf.train.AdamOptimizer()):
"""
"""
self.layerlist=layerlist
self.layernum=len(layerlist)
self.n_input = featurelen*length
self.encode_index=encode_index
network_weights = self._initialize_weights()
self.weights = network_weights
self._create_layers()
# cost
self.cost = 0.5*tf.reduce_sum(tf.pow(tf.subtract(self.reconstruction, self.x), 2.0))
self.optimizer = optimizer.minimize(self.cost)
init = tf.global_variables_initializer()
self.sess = tf.Session(config=config)
self.sess.run(init)
def _initialize_weights(self):
all_weights = dict()
all_weights['w'+str(1)]=tf.Variable(xavier_init(self.n_input, self.layerlist[0][0]))
all_weights['b'+str(1)] = tf.Variable(tf.random_normal([self.layerlist[0][0]], dtype=tf.float32))
for i in range(1,self.layernum):
all_weights['w'+str(i+1)]=tf.Variable(xavier_init(self.layerlist[i-1][0], self.layerlist[i][0]))
all_weights['b'+str(i+1)] = tf.Variable(tf.random_normal([self.layerlist[i][0]], dtype=tf.float32))
return all_weights
def _create_layers(self):
"""
"""
self.x = tf.placeholder(tf.float32, [None, self.n_input])
layer=(self.layerlist[0][1])(tf.add(tf.matmul(self.x, self.weights['w1']), self.weights['b1']))
for i in range(1,self.layernum):
layer=(self.layerlist[i][1])(tf.add(tf.matmul(layer, self.weights['w'+str(i+1)]), self.weights['b'+str(i+1)]))
if i==self.encode_index:
print("enc")
self.encoded=layer
self.reconstruction=layer
def partial_fit(self, X):
cost, opt = self.sess.run((self.cost, self.optimizer), feed_dict={self.x: X})
return cost
def calc_total_cost(self, X):
return self.sess.run(self.cost, feed_dict = {self.x: X})
def encode(self, X):
return self.sess.run(self.encoded, feed_dict={self.x: X})
def decode(self, encoded = None):
if encoded is None:
encoded = np.random.normal(size=self.weights["b1"])
return self.sess.run(self.reconstruction, feed_dict={self.encoded: encoded})
def reconstruct(self, X):
return self.sess.run(self.reconstruction, feed_dict={self.x: X})
def train(self,X_train,X_test,batch_size,max_epochs):
for epoch in range(training_epochs):
avg_cost = 0.
total_batch = int(len(X_train) / batch_size)
# Loop over all batches
for i in range(total_batch):
batch_xs = self.get_random_block_from_data(X_train, batch_size)
cost = autoencoder.partial_fit(batch_xs)
avg_cost += cost / batch_size
# Display logs per epoch step
if epoch % display_step == 0:
print ("Epoch:", '%04d' % (epoch + 1), \
"cost=", "{:.9f}".format(avg_cost))
def get_random_block_from_data(self,data, batch_size):
start_index = np.random.randint(0, len(data) - batch_size)
return data[start_index:(start_index + batch_size)]
In [12]:
training_epochs = 40
batch_size = 1280
def ekv(e):
return e
display_step = 1
a=[[360,tf.nn.softplus],[360,ekv]]
autoencoder = Autoencoder_ffnn(10,36,
layerlist=a,
encode_index=1,
optimizer = tf.train.AdamOptimizer(learning_rate = 0.001))
autoencoder.train(x_train,x_test,512,10)
print ("Total cost: " + str(autoencoder.calc_total_cost(x_test)))
In [ ]:
In [ ]:
In [ ]: