In [ ]:
import tensorflow as tf
from twenty_forty_eight_linux import TwentyFortyEight
from collections import deque
import numpy as np
In [ ]:
# Neural network hyperparameters
INPUT_DIM = 16
HIDDEN_LAYER_UNITS = 30
OUTPUT_DIM = 4
# RMSProp hyperparameters
LEARNING_RATE = 0.0003
DECAY_FACTOR = 0.9
# RL hyperparameters
GAMMA = 0.95
In [ ]:
# Game constants
POSSIBLE_ACTIONS = np.arange(1, 5)
In [ ]:
sess = tf.InteractiveSession()
In [ ]:
state = tf.placeholder(tf.float32, shape=[None, INPUT_DIM], name="state_tensor")
direction = tf.placeholder(tf.float32, shape=[None, OUTPUT_DIM], name="direction_label")
disc_reward = tf.placeholder(tf.float32, shape=[None, 1], name="discunted_reward")
In [ ]:
#W1 = tf.get_variable("W1", shape=(INPUT_DIM, HIDDEN_LAYER_UNITS), initializer=tf.contrib.layers.xavier_initializer(False))
#W2 = tf.get_variable("W2", shape=(HIDDEN_LAYER_UNITS, OUTPUT_DIM), initializer=tf.contrib.layers.xavier_initializer(False))
# B1 = tf.get_variable("B1", shape=(HIDDEN_LAYER_UNITS,), initializer=tf.contrib.layers.xavier_initializer(False))
# B2 = tf.get_variable("B2", shape=(OUTPUT_DIM,), initializer=tf.contrib.layers.xavier_initializer(False))
In [ ]:
W1 = tf.Variable(tf.random_normal((INPUT_DIM, HIDDEN_LAYER_UNITS), stddev=0.001), name="W1")
W2 = tf.Variable(tf.random_normal((HIDDEN_LAYER_UNITS, OUTPUT_DIM), stddev=0.001), name="W2")
B1 = tf.Variable(tf.random_normal((1, HIDDEN_LAYER_UNITS), stddev=0.001), name="B1")
B2 = tf.Variable(tf.random_normal((1, OUTPUT_DIM), stddev=0.001), name="B2")
In [ ]:
h1 = tf.add(tf.matmul(state, W1), B1)
activation_hidden = tf.nn.relu(h1)
output = tf.add(tf.matmul(activation_hidden, W2), B2)
In [ ]:
output_softmax = tf.nn.softmax(output)
In [ ]:
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(output, direction))
loss_function = tf.summary.scalar("loss_func", loss) # Summary op for TensorBoard
In [ ]:
train_opt = tf.train.RMSPropOptimizer(LEARNING_RATE, decay=DECAY_FACTOR)
In [ ]:
train_grad = train_opt.compute_gradients(-loss, [W1, W2, B1, B2])
train_grad
In [ ]:
modulated_grads = [tf.multiply(disc_reward, g[0]) for g in train_grad]
modulated_grads
In [ ]:
train_apply_grad = train_opt.apply_gradients([(mod_grad, grads_and_vars[1]) for mod_grad, grads_and_vars in
zip(modulated_grads, train_grad)])
In [ ]:
tf.global_variables_initializer().run()
In [ ]:
summary = tf.summary.FileWriter("d:\\Coding\\Tensorflow_log\\2048", sess.graph)
merged = tf.summary.merge_all() # Merge all summary operations (In this case we only have loss_func)
In [ ]:
def initialize_game():
return TwentyFortyEight(4, 4)
def game_state(g):
return np.asarray(g.table_as_array(), dtype=np.float32).reshape(1, 16)
def direction_vector(action):
return np.eye(4, dtype=np.float32)[action - 1].reshape(1, 4)
def discounted_rewards(r):
gamma_vector = (GAMMA ** np.arange(len(r)))[::-1]
rewards = np.asarray(r, dtype=np.float32)
discounted = np.zeros_like(r, dtype=np.float32)
for i in range(len(r)):
discounted[i] =np.sum(rewards[i:] * gamma_vector[i:][::-1])
return discounted.reshape(len(r), 1)
In [ ]:
ep_number = 0
for _ in range(200):
# Initialize game
game = initialize_game()
states_input_deque, actions_deque, rewards_deque = deque(), deque(), deque()
is_ended = False
no_of_steps = 0
while not is_ended:
# Append current game state
current_state = game_state(game)
states_input_deque.append(current_state)
# Choose action from the network and append it to the actions_deque
action_distribution = sess.run(output_softmax, feed_dict={state: current_state})
action = np.random.choice(POSSIBLE_ACTIONS, 1, p=action_distribution.ravel())[0]
actions_deque.append(action)
# Make the move in the game
game.move(action)
no_of_steps += 1
# Get next state, reward
current_state, reward, is_ended = game_state(game), game.reward(), game.is_ended()
# Append rewards
rewards_deque.append(reward)
if no_of_steps % 250 == 0:
print(action_distribution)
for s, a, r in zip(states_input_deque, actions_deque, discounted_rewards(rewards_deque)):
# r_ = r.reshape(len(r), 1)
r_ = r.reshape(1, 1)
sess.run(train_apply_grad, feed_dict={state:s, direction: direction_vector(a), disc_reward: r_})
ep_number += 1
print(ep_number) if ep_number % 10 == 0 else None
# sess.run(train_grad, feed_dict={state: np.arange(16, dtype=np.float32).reshape(1, 16),
# direction: np.eye(4, dtype=np.float32)[3].reshape(1, 4)})
In [ ]:
summary.flush()