In [ ]:
import tensorflow as tf
from twenty_forty_eight_linux import TwentyFortyEight
from collections import deque
import numpy as np

In [ ]:
# Neural network hyperparameters
INPUT_DIM = 16
HIDDEN_LAYER_UNITS = 30
OUTPUT_DIM = 4
# RMSProp hyperparameters
LEARNING_RATE = 0.0003
DECAY_FACTOR = 0.9
# RL hyperparameters
GAMMA = 0.95

In [ ]:
# Game constants
POSSIBLE_ACTIONS = np.arange(1, 5)

Session


In [ ]:
sess = tf.InteractiveSession()

Graph building

Input & direction placeholders


In [ ]:
state = tf.placeholder(tf.float32, shape=[None, INPUT_DIM], name="state_tensor")
direction = tf.placeholder(tf.float32, shape=[None, OUTPUT_DIM], name="direction_label")
disc_reward = tf.placeholder(tf.float32, shape=[None, 1], name="discunted_reward")

Weights and biases (Xavier)


In [ ]:
#W1 = tf.get_variable("W1", shape=(INPUT_DIM, HIDDEN_LAYER_UNITS), initializer=tf.contrib.layers.xavier_initializer(False))
#W2 = tf.get_variable("W2", shape=(HIDDEN_LAYER_UNITS, OUTPUT_DIM), initializer=tf.contrib.layers.xavier_initializer(False))
# B1 = tf.get_variable("B1", shape=(HIDDEN_LAYER_UNITS,), initializer=tf.contrib.layers.xavier_initializer(False))
# B2 = tf.get_variable("B2", shape=(OUTPUT_DIM,), initializer=tf.contrib.layers.xavier_initializer(False))

Weights and biases (Near zero random)


In [ ]:
W1 = tf.Variable(tf.random_normal((INPUT_DIM, HIDDEN_LAYER_UNITS), stddev=0.001), name="W1")
W2 = tf.Variable(tf.random_normal((HIDDEN_LAYER_UNITS, OUTPUT_DIM), stddev=0.001), name="W2")
B1 = tf.Variable(tf.random_normal((1, HIDDEN_LAYER_UNITS), stddev=0.001), name="B1")
B2 = tf.Variable(tf.random_normal((1, OUTPUT_DIM), stddev=0.001), name="B2")

Neural network operations


In [ ]:
h1 = tf.add(tf.matmul(state, W1), B1)
activation_hidden = tf.nn.relu(h1)
output = tf.add(tf.matmul(activation_hidden, W2), B2)

In [ ]:
output_softmax = tf.nn.softmax(output)

Loss calculation (Cross-entropy)


In [ ]:
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(output, direction))
loss_function = tf.summary.scalar("loss_func", loss) # Summary op for TensorBoard

RMSPropOptimizer


In [ ]:
train_opt = tf.train.RMSPropOptimizer(LEARNING_RATE, decay=DECAY_FACTOR)

Gradient calculation


In [ ]:
train_grad = train_opt.compute_gradients(-loss, [W1, W2, B1, B2])
train_grad

Gradient modulation


In [ ]:
modulated_grads = [tf.multiply(disc_reward, g[0]) for g in train_grad]
modulated_grads

Apply modulated gradients


In [ ]:
train_apply_grad = train_opt.apply_gradients([(mod_grad, grads_and_vars[1]) for mod_grad, grads_and_vars in
                                              zip(modulated_grads, train_grad)])

Training

Initialization


In [ ]:
tf.global_variables_initializer().run()

FileWriter for TensorBoard


In [ ]:
summary = tf.summary.FileWriter("d:\\Coding\\Tensorflow_log\\2048", sess.graph)
merged = tf.summary.merge_all() # Merge all summary operations (In this case we only have loss_func)

Game


In [ ]:
def initialize_game():
    return TwentyFortyEight(4, 4)

def game_state(g):
    return np.asarray(g.table_as_array(), dtype=np.float32).reshape(1, 16)

def direction_vector(action):
    return np.eye(4, dtype=np.float32)[action - 1].reshape(1, 4)

def discounted_rewards(r):
    gamma_vector = (GAMMA ** np.arange(len(r)))[::-1]
    rewards = np.asarray(r, dtype=np.float32)
    discounted = np.zeros_like(r, dtype=np.float32)
    for i in range(len(r)):
        discounted[i] =np.sum(rewards[i:] * gamma_vector[i:][::-1])
    return discounted.reshape(len(r), 1)

Training steps


In [ ]:
ep_number = 0
for _ in range(200):
    # Initialize game
    game = initialize_game()
    states_input_deque, actions_deque, rewards_deque = deque(), deque(), deque()
    is_ended = False
    no_of_steps = 0
    
    while not is_ended:
        # Append current game state
        current_state = game_state(game)
        states_input_deque.append(current_state)

        # Choose action from the network and append it to the actions_deque
        action_distribution = sess.run(output_softmax, feed_dict={state: current_state})
        action = np.random.choice(POSSIBLE_ACTIONS, 1, p=action_distribution.ravel())[0]
        actions_deque.append(action)

        # Make the move in the game
        game.move(action)
        no_of_steps += 1

        # Get next state, reward
        current_state, reward, is_ended = game_state(game), game.reward(), game.is_ended()

        # Append rewards
        rewards_deque.append(reward)

        if no_of_steps % 250 == 0:
            print(action_distribution)
    
    for s, a, r in zip(states_input_deque, actions_deque, discounted_rewards(rewards_deque)):
        # r_ = r.reshape(len(r), 1)
        r_ = r.reshape(1, 1)
        sess.run(train_apply_grad, feed_dict={state:s, direction: direction_vector(a), disc_reward: r_})
    
    ep_number += 1
    print(ep_number) if ep_number % 10 == 0 else None
    
        

# sess.run(train_grad, feed_dict={state: np.arange(16, dtype=np.float32).reshape(1, 16),
#                                     direction: np.eye(4, dtype=np.float32)[3].reshape(1, 4)})

Flush out data to disk


In [ ]:
summary.flush()