In [ ]:
import tensorflow as tf
from twenty_forty_eight_linux import TwentyFortyEight
from collections import deque
import numpy as np
In [ ]:
# Policy neural network hyperparameters
INPUT_DIM = 16
HIDDEN_LAYER_UNITS = 300
OUTPUT_DIM = 4
# Value function neural network hyperparameters
VF_HIDDEN_LAYER_UNITS = 200
VF_OUTPUT_DIM = 1
# RMSProp hyperparameters (Policy)
LEARNING_RATE = 0.001
DECAY_FACTOR = 0.9
# RMSProp hyperparameters (Value function)
VF_LEARNING_RATE = 0.001
VF_DECAY_FACTOR = 0.9
# RL hyperparameters
DISCOUNT_FACTOR = 0.95
# Loss hyperparameters
ENTROPY_REGULARIZATION_FACTOR = 0.01
In [ ]:
# Game constants
POSSIBLE_ACTIONS = np.arange(1, 5)
In [ ]:
def stack_arrays(*args):
return np.vstack(args)
In [ ]:
sess = tf.InteractiveSession()
In [ ]:
# Xavier initialization is harcoded just yet.
def new_layer(input_tf_tensor, hidden_unit_number, scope_name, activation=tf.nn.relu, bias=True):
input_dimension = input_tf_tensor.get_shape().as_list()[1]
# Weight
with tf.variable_scope(scope_name):
W = tf.get_variable("W", shape=(input_dimension, hidden_unit_number),
initializer=tf.contrib.layers.xavier_initializer(False))
# Matrix multiplication
h = tf.matmul(input_tf_tensor, W)
if bias:
B = tf.get_variable("B", shape=(1, hidden_unit_number),
initializer=tf.contrib.layers.xavier_initializer(False))
hb = h + B
output = activation(hb) if bias else activation(h)
return output
def regression_activation(variable):
return tf.reduce_sum(variable, 1)
In [ ]:
# Policy network
state = tf.placeholder(tf.float32, shape=[None, INPUT_DIM], name="state_tensor")
direction = tf.placeholder(tf.float32, shape=[None, OUTPUT_DIM], name="direction_label")
advantage_value = tf.placeholder(tf.float32, shape=[], name="advantage_value")
# Value function network (+state)
new_state_val_with_prev_params = tf.placeholder(tf.float32, shape=(), name="new_state_val_with_prev_params")
In [ ]:
policy_h1 = new_layer(state, HIDDEN_LAYER_UNITS, "Policy_Hidden_1")
policy_h2 = new_layer(policy_h1, HIDDEN_LAYER_UNITS, "Policy_Hidden_2")
policy_output = new_layer(policy_h2, OUTPUT_DIM, "Policy_Output", tf.nn.softmax)
In [ ]:
vf_h1 = new_layer(state, VF_HIDDEN_LAYER_UNITS, "VF_Hidden_1")
# Regression output (new_layer should be called with bias=False)
vf_output = new_layer(vf_h1, VF_OUTPUT_DIM, "VF_Output", regression_activation)
In [ ]:
prev_vf_h1 = new_layer(state, VF_HIDDEN_LAYER_UNITS, "PREV_VF_Hidden_1")
# Regression output (new_layer should be called with bias=False)
prev_vf_output = new_layer(prev_vf_h1, VF_OUTPUT_DIM, "PREV_VF_Output", regression_activation)
In [ ]:
assignment_ops = []
for vf_scope, prev_vf_scope in zip(["VF_Hidden_1", "VF_Output"],["PREV_VF_Hidden_1", "PREV_VF_Output"]):
with tf.variable_scope(vf_scope, reuse=True):
weight = tf.get_variable("W")
bias = tf.get_variable("B")
# Assign OPs
with tf.variable_scope(prev_vf_scope, reuse=True):
weight_assign_op = tf.get_variable("W").assign(weight)
bias_assign_op = tf.get_variable("B").assign(bias)
assignment_ops += [weight_assign_op, bias_assign_op]
In [ ]:
policy_loss = - tf.reduce_sum(tf.log(tf.reduce_sum(policy_output * direction)) * advantage_value)
policy_loss_function = tf.summary.scalar("loss_func", policy_loss) # Summary op for TensorBoard
In [ ]:
entropy = - tf.reduce_sum(policy_output * tf.log(policy_output))
In [ ]:
vf_error = tf.subtract(new_state_val_with_prev_params, vf_output)
In [ ]:
vf_loss = 0.5 * tf.square(vf_error)
In [ ]:
total_loss = policy_loss - entropy * ENTROPY_REGULARIZATION_FACTOR
In [ ]:
train_opt = tf.train.RMSPropOptimizer(LEARNING_RATE, decay=DECAY_FACTOR)
vf_train_opt = tf.train.RMSPropOptimizer(VF_LEARNING_RATE, decay=VF_DECAY_FACTOR)
In [ ]:
# Policy (we use negative loss, since we want gradient ASCENT)
train_apply_grad = train_opt.minimize(total_loss)
# Value function Neural network
vf_apply_grad = vf_train_opt.minimize(vf_loss)
In [ ]:
tf.global_variables_initializer().run()
In [ ]:
summary = tf.summary.FileWriter("c:\\Work\\Coding\\Tensorflow_log\\2048", sess.graph)
merged = tf.summary.merge_all() # Merge all summary operations (In this case we only have loss_func)
In [ ]:
def initialize_game():
return TwentyFortyEight(4, 4)
def game_state(g):
return np.asarray(g.table_as_array(), dtype=np.float32).reshape(1, 16)
def direction_vector(action):
return np.eye(4, dtype=np.float32)[action - 1].reshape(1, 4)
def discounted_rewards(r):
gamma_vector = (DISCOUNT_FACTOR ** np.arange(len(r)))[::-1]
rewards = np.asarray(r, dtype=np.float32)
discounted = np.zeros_like(r, dtype=np.float32)
for i in range(len(r)):
discounted[i] =np.sum(rewards[i:] * gamma_vector[i:][::-1])
return discounted.reshape(len(r), 1)
In [ ]:
ep_number = 0
for _ in range(1):
# Initialize game
game = initialize_game()
states_input_deque, actions_deque, rewards_deque = deque(), deque(), deque()
is_ended = False
no_of_steps = 0
current_state = game_state(game)
while not is_ended:
# for step in range(10):
# Append current game state
states_input_deque.append(current_state)
# Choose action from the network and append it to the actions_deque
action_distribution = sess.run(policy_output, feed_dict={state: current_state})
action = np.random.choice(POSSIBLE_ACTIONS, 1, p=action_distribution.ravel())[0]
actions_deque.append(action)
# Make the move in the game
game.move(action)
no_of_steps += 1
# Get next state, reward
current_state, rew, is_ended = game_state(game), game.reward(), game.is_ended()
# print("Reward: ", rew)
# Append rewards
rewards_deque.append(rew)
# Previous and current state values with current network parameters
p_state_val, c_state_val = sess.run(vf_output, feed_dict={state: stack_arrays(states_input_deque[-1], current_state)})
# Advantage value calc with 1-step look-ahead
adv_val = np.asscalar(rew + DISCOUNT_FACTOR * c_state_val - p_state_val)
# print("Advantage value: ", adv_val)
#
# print(sess.run([policy_output, entropy], feed_dict={state: states_input_deque[-1]}))
# Policy network parameter update
sess.run(train_apply_grad, feed_dict={state: states_input_deque[-1], direction: direction_vector(action), advantage_value: adv_val})
# Value function network parameter update (if we are not stuck in a state)
if not np.all(current_state == states_input_deque[-1]):
# Get New Value With Old Parameters
n_v_w_o_p = sess.run(prev_vf_output, feed_dict={state: current_state})
# Copy current parameters
sess.run(assignment_ops)
# Calculate current Q
curr_q = np.asscalar(rew + DISCOUNT_FACTOR * n_v_w_o_p)
# Update value function parameters
# sess.run(vf_apply_grad, feed_dict={new_state_val_with_prev_params: curr_q, state: states_input_deque[-1]})
# Checks
if (no_of_steps) % 500 == 0:
print("Step: " + str(no_of_steps))
print("State: " + str(states_input_deque[-1]))
print("Action distribtuion: " + str(action_distribution))
print("Reward: " + str(rew))
print("Previous state value: "+ str(p_state_val))
print("Advantage value: " + str(adv_val))
print("--------------Episode over!----------------")
In [ ]:
summary.flush()