In [ ]:
import tensorflow as tf
from twenty_forty_eight_linux import TwentyFortyEight
from collections import deque
import numpy as np
In [ ]:
# Policy neural network hyperparameters
INPUT_DIM = 16
HIDDEN_LAYER_UNITS = 30
OUTPUT_DIM = 4
# Value function neural network hyperparameters
VF_HIDDEN_LAYER_UNITS = 20
VF_OUTPUT_DIM = 1
# RMSProp hyperparameters (Policy)
LEARNING_RATE = 0.0003
DECAY_FACTOR = 0.9
# RMSProp hyperparameters (Value function)
VF_LEARNING_RATE = 0.001
VF_DECAY_FACTOR = 0.9
# RL hyperparameters
GAMMA = 0.95
In [ ]:
# Game constants
POSSIBLE_ACTIONS = np.arange(1, 5)
In [ ]:
sess = tf.InteractiveSession()
In [ ]:
# Policy network
state = tf.placeholder(tf.float32, shape=[None, INPUT_DIM], name="state_tensor")
direction = tf.placeholder(tf.float32, shape=[None, OUTPUT_DIM], name="direction_label")
advantage_value = tf.placeholder(tf.float32, shape=[None, 1], name="advantage_value")
# Value function network (+state)
reward = tf.placeholder(tf.float32, shape=[None, 1], name="reward")
prev_state_val = tf.placeholder(tf.float32, shape=(), name="previous_state_value")
In [ ]:
# Policy weights and biases
W1 = tf.get_variable("W1", shape=(INPUT_DIM, HIDDEN_LAYER_UNITS), initializer=tf.contrib.layers.xavier_initializer(False))
W2 = tf.get_variable("W2", shape=(HIDDEN_LAYER_UNITS, OUTPUT_DIM), initializer=tf.contrib.layers.xavier_initializer(False))
B1 = tf.get_variable("B1", shape=(1, HIDDEN_LAYER_UNITS), initializer=tf.contrib.layers.xavier_initializer(False))
B2 = tf.get_variable("B2", shape=(1, OUTPUT_DIM), initializer=tf.contrib.layers.xavier_initializer(False))
# Value function weights and biases
VW1 = tf.get_variable("VW1", shape=(INPUT_DIM, VF_HIDDEN_LAYER_UNITS), initializer=tf.contrib.layers.xavier_initializer(False))
VW2 = tf.get_variable("VW2", shape=(VF_HIDDEN_LAYER_UNITS, VF_OUTPUT_DIM), initializer=tf.contrib.layers.xavier_initializer(False))
VB1 = tf.get_variable("VB1", shape=(1, VF_HIDDEN_LAYER_UNITS), initializer=tf.contrib.layers.xavier_initializer(False))
VB2 = tf.get_variable("VB2", shape=(1, VF_OUTPUT_DIM), initializer=tf.contrib.layers.xavier_initializer(False))
In [ ]:
#W1 = tf.Variable(tf.random_normal((INPUT_DIM, HIDDEN_LAYER_UNITS), stddev=0.001), name="W1")
#W2 = tf.Variable(tf.random_normal((HIDDEN_LAYER_UNITS, OUTPUT_DIM), stddev=0.001), name="W2")
#B1 = tf.Variable(tf.random_normal((HIDDEN_LAYER_UNITS,), stddev=0.001), name="B1")
#B2 = tf.Variable(tf.random_normal((OUTPUT_DIM,), stddev=0.001), name="B2")
#VW1 =
In [ ]:
h1 = tf.add(tf.matmul(state, W1), B1)
activation_hidden = tf.nn.relu(h1)
output = tf.add(tf.matmul(activation_hidden, W2), B2)
In [ ]:
output_softmax = tf.nn.softmax(output)
In [ ]:
vf_h1 = tf.add(tf.matmul(state, VW1), VB1)
vf_activation_hidden = tf.nn.relu(vf_h1)
vf_output = tf.add(tf.matmul(vf_activation_hidden, VW2), VB2)
vf_output_unit = tf.reduce_sum(vf_output)
In [ ]:
loss = - tf.reduce_sum(tf.log(tf.reduce_sum(output * direction)) * advantage_value)
loss_function = tf.summary.scalar("loss_func", loss) # Summary op for TensorBoard
In [ ]:
vf_error = tf.subtract(reward + GAMMA * vf_output_unit, prev_state_val)
In [ ]:
vf_loss = 0.5 * tf.square(vf_error)
In [ ]:
train_opt = tf.train.RMSPropOptimizer(LEARNING_RATE, decay=DECAY_FACTOR)
vf_train_opt = tf.train.RMSPropOptimizer(VF_LEARNING_RATE, decay=VF_DECAY_FACTOR)
In [ ]:
# Policy (we use negative loss, since we want gradient ASCENT)
train_apply_grad = train_opt.minimize(loss)
# Value function Neural network
vf_apply_grad = vf_train_opt.minimize(vf_loss)
In [ ]:
tf.global_variables_initializer().run()
In [ ]:
summary = tf.summary.FileWriter("c:\\Work\\Coding\\Tensorflow_log\\2048", sess.graph)
merged = tf.summary.merge_all() # Merge all summary operations (In this case we only have loss_func)
In [ ]:
def initialize_game():
return TwentyFortyEight(4, 4)
def game_state(g):
return np.asarray(g.table_as_array(), dtype=np.float32).reshape(1, 16)
def direction_vector(action):
return np.eye(4, dtype=np.float32)[action - 1].reshape(1, 4)
def discounted_rewards(r):
gamma_vector = (GAMMA ** np.arange(len(r)))[::-1]
rewards = np.asarray(r, dtype=np.float32)
discounted = np.zeros_like(r, dtype=np.float32)
for i in range(len(r)):
discounted[i] =np.sum(rewards[i:] * gamma_vector[i:][::-1])
return discounted.reshape(len(r), 1)
In [ ]:
ep_number = 0
for _ in range(20):
# Initialize game
game = initialize_game()
states_input_deque, actions_deque, rewards_deque = deque(), deque(), deque()
is_ended = False
no_of_steps = 0
current_state = game_state(game)
while not is_ended:
#for step in range(5):
# Append current game state
states_input_deque.append(current_state)
# Choose action from the network and append it to the actions_deque
action_distribution = sess.run(output_softmax, feed_dict={state: current_state})
action = np.random.choice(POSSIBLE_ACTIONS, 1, p=action_distribution.ravel())[0]
actions_deque.append(action)
# Make the move in the game
game.move(action)
no_of_steps += 1
# Get next state, reward
current_state, rew, is_ended = game_state(game), game.reward(), game.is_ended()
np_rew = np.asarray([[rew]])
# Append rewards
rewards_deque.append(rew)
# State value
p_state_val = sess.run(vf_output_unit, feed_dict={state: states_input_deque[-1]})
# "Advantage value" calc
adv_val = sess.run(vf_error, feed_dict={reward: np_rew, prev_state_val: p_state_val, state: current_state})
# Policy network parameter update
sess.run(train_apply_grad, feed_dict={state: states_input_deque[-1], direction: direction_vector(action), advantage_value: adv_val})
# Value function network parameter update (if we are not stuck in a state)
if not np.all(current_state == states_input_deque[-1]):
sess.run(vf_apply_grad, feed_dict={reward: np_rew, prev_state_val: p_state_val, state: current_state})
# Checks
if (no_of_steps) % 500 == 0:
print("Step: " + str(no_of_steps))
print("State: " + str(states_input_deque[-1]))
print("Action distribtuion: " + str(action_distribution))
print("Reward: " + str(rew))
print("Previous state value: "+ str(p_state_val))
print("Advantage value: " + str(adv_val))
print("--------------Episode over!----------------")
In [ ]:
sess.run(vf_activation_hidden, feed_dict={state: states_input_deque[-1]})
In [ ]:
VW2.eval()
In [ ]:
summary.flush()