Pong-Playing TensorFlow Neural Network

Import modules needed to train neural network in Pong environment


In [1]:
import gym
import numpy as np
import tensorflow as tf
from IPython import display
import matplotlib.pyplot as plt
import time

config = tf.ConfigProto()
#config.gpu_options.allow_growth = True

%matplotlib inline

Investigate the environment and set up data preprocessing functions


In [2]:
env = gym.make("Pong-v0")
env.render(mode='rgb_array').shape
env.reset()
print(env.action_space)
print(env.unwrapped.get_action_meanings())

top = 32
bottom = 195
left = 14
right = 146
downsampled_height = int(np.rint((bottom-top)/2))
downsampled_width = int(np.rint((right-left)/2))
input_dim = downsampled_height*downsampled_width

def preprocess(img, reshape=False):
    #crop, grab only one channel, and downsample by factor of 2
    img = img[top:bottom,left:right,0][::2,::2] 
    #get rid of background color, 109 in first frame, 144 otherwise
    img[np.isin(img,[144,109])] = 0 
    img[img!=0] = 1
    if not reshape:
        return img.astype(np.int).ravel()
    else:
        return img.astype(np.int)

def reshape(img):
    return img.reshape(downsampled_height,downsampled_width).astype(np.int)

#what color pixels are in this image?
#print(list(zip(*np.unique(env.render(mode='rgb_array')[top:bottom,left:right,0],return_counts=1))))
#print(list(zip(*np.unique(env.render(mode='rgb_array')[top:bottom,left:right,0][::2,::2],return_counts=1))))

plt.subplots(2,3, figsize=(12,10))

plt.subplot(2,3,1)
plt.title("The Atari Pong Game Screen")
plt.imshow(env.reset())

plt.subplot(2,3,2)
plt.title("Cropped, First Channel Only")
plt.imshow(env.render(mode='rgb_array')[top:bottom,left:right,0])

plt.subplot(2,3,3)
plt.title("Prior Plus Downsample")
plt.imshow(env.render(mode='rgb_array')[top:bottom,left:right,0][::2,::2])

plt.subplot(2,3,4)
plt.title("After a step, color scheme changes")
plt.imshow(env.step(2)[0][top:bottom,left:right,0][::2,::2])

plt.subplot(2,3,5)
plt.title("After Preprocessing Frame 1")
plt.imshow(reshape(preprocess(env.reset())))

plt.subplot(2,3,6)
plt.title("After Preprocessing Frame 2")
plt.imshow(reshape(preprocess(env.step(2)[0])))
plt.show()

#print(list(zip(*np.unique(env.render(mode='rgb_array')[top:bottom,left:right,0][::2,::2],return_counts=1))))
#print(list(zip(*np.unique(reshape(preprocess(env.reset())),return_counts=1))))
#print(list(zip(*np.unique(reshape(preprocess(env.render(mode='rgb_array'))),return_counts=1))))


[2017-11-15 18:11:43,569] Making new env: Pong-v0
Discrete(6)
['NOOP', 'FIRE', 'RIGHT', 'LEFT', 'RIGHTFIRE', 'LEFTFIRE']

Define and initialize the neural network


In [ ]:
sess = tf.InteractiveSession(config=config)

x = tf.placeholder(tf.float32, shape=[None, input_dim])
advantage = tf.placeholder(tf.float32, shape=[None])
action_is_down = tf.placeholder(tf.float32, shape=[None])

h1_dim = 200
l1 = tf.layers.dense(x, h1_dim, activation=tf.nn.relu)
#use tf.squeeze() to reshape this from [batch_size,1] to [batch_size]
logit = tf.squeeze(tf.layers.dense(l1, 1))
sampled_action_negative_log_prob = tf.nn.sigmoid_cross_entropy_with_logits(logits=logit, labels=action_is_down)
reward_weighted_neg_likelihood = advantage*sampled_action_negative_log_prob
optimizer = tf.train.AdamOptimizer(learning_rate=1e-4)
train = optimizer.minimize(tf.reduce_sum(reward_weighted_neg_likelihood))

saver = tf.train.Saver()
tf.global_variables_initializer().run()

Set up an agent class that plays pong using actions chosen by the neural network in the active TensorFlow session


In [ ]:
class pong_agent:
        
    def clean_slate(self):    
        self.wins = 0
        self.games = 0    
        self.p_list = []
        self.actions = []
        self.frames = []
        self.frame_changes = []
        self.rewards = []

    def make_batch(self, n_sets):
        self.clean_slate()
        for _ in range(n_sets):
            self.play_set()
        self.normalize_rewards()
        return self.frame_changes, self.actions, self.rewards

    def play_set(self):
        env.reset()
        done = 0
        self.frames.append(preprocess(env.render(mode='rgb_array')))
        self.frame_changes.append(self.frames[-1] - self.frames[-1])
        while not done:
            done = self.play_point()

    def play_point(self):
        frames_played = 0
        discount = 0.99
        while True:
            prob, action, reward, new_frame, done = self.play_frame(self.frame_changes[-1])
            self.p_list.append(prob)
            self.actions.append(action)
            frames_played+= 1
            if not done:
                self.frames.append(new_frame)
                self.frame_changes.append(self.frames[-1] - self.frames[-2])
            if reward:
                self.rewards+= [reward * discount**k for k in reversed(range(frames_played))]
                self.wins+= max(reward,0)
                self.games+= 1
                break
        return done

    def play_frame(self, frame_change):
        p_down = 1/(1+np.exp(-sess.run(logit, feed_dict={x:np.array([frame_change])})))
        #sample an action using p_down, 3=down, 2=up
        action = np.random.binomial(1, p_down) + 2 
        observation, reward, done = env.step(action)[:3]
        return p_down, action, reward, preprocess(observation), done

    def normalize_rewards(self):
        mean = np.mean(self.rewards)
        std_dev = np.std(self.rewards)
        self.rewards = (np.array(self.rewards)-mean)/std_dev

Train the agent for 3000 updates to reach >50% win rate

This part takes a while. My setup processes the earlier batches at a rate of ~100 batches/hour. To monitor progress (the agent's win rate), I output at a .PNG plot every 10 batches. There's a visible shift in the win rate from ~2% to ~4% by batch 300. Later batches take longer to process because the agent is playing more games.

To speed up and improve quality in the later stages of training, we could trim the number of games played (because each point takes more frames to complete as the agent gets better), and add a decaying learning rate, respectively.


In [ ]:
start = time.time()
ratios = []
matches_per_batch = 10
epochs = 3001
agent = pong_agent()

for i in range(epochs):
    #play Pong with the network, save frames and associated rewards
    frame_changes, actions, rewards = agent.make_batch(matches_per_batch)
    train.run(feed_dict={x:frame_changes, action_is_down:np.array(actions)==3, advantage:rewards})
    ratios.append(agent.wins/agent.games*100)
    if i%10==0:
        print("{}: batch {} finished after {} hours".format(time.strftime('%X %x '), 
                                                    i, round((time.time()-start)/3600,2)))
        plt.title("Agent Quality over Time")
        plt.plot(range(1,i+2), ratios)
        plt.xlabel("Number of Updates")
        plt.ylabel("Percent of Games Won")
        plt.savefig("./pong_agent_quality")
        if i%100==0:
            #save out the neural network's weights here
            saver.save(sess, "./pong_agent.ckpt")

Restore the saved variables and let the agent play Pong!


In [ ]:
sess.close()
tf.reset_default_graph()

agent = pong_agent()

sess = tf.InteractiveSession(config=config)

x = tf.placeholder(tf.float32, shape=[None, input_dim])
advantage = tf.placeholder(tf.float32, shape=[None])
action_is_down = tf.placeholder(tf.float32, shape=[None])

h1_dim = 200
l1 = tf.layers.dense(x, h1_dim, activation=tf.nn.relu)
#use tf.squeeze() to reshape this from [batch_size,1] to [batch_size]
logit = tf.squeeze(tf.layers.dense(l1, 1))
sampled_action_negative_log_prob = tf.nn.sigmoid_cross_entropy_with_logits(logits=logit, labels=action_is_down)
reward_weighted_neg_likelihood = advantage*sampled_action_negative_log_prob
optimizer = tf.train.AdamOptimizer(learning_rate=1e-4)
train = optimizer.minimize(tf.reduce_sum(reward_weighted_neg_likelihood))

saver = tf.train.Saver()
saver.restore(sess, "./pong_agent.ckpt")

number_of_frames_to_play = 400
frame = env.reset()
new_frame = np.zeros_like(frame)
diff = np.zeros_like(preprocess(np.copy(frame)))
img = plt.imshow(frame)
for i in range(number_of_frames_to_play):
    action = agent.play_frame(diff)[1]
    new_frame = env.render(mode='rgb_array')
    diff = preprocess(np.copy(new_frame))-preprocess(frame)
    frame = new_frame
    img.set_data(new_frame)
    display.display(plt.gcf())
    display.clear_output(wait=True)

Save a GIF


In [ ]:
import moviepy.editor as mpy
global frame, new_frame, diff
frame = env.reset()
#resetting a couple of pixels that turned black for some reason
frame[0][:8]=frame[0][8]
new_frame = np.copy(frame)
diff = np.zeros_like(preprocess(np.copy(frame)))
def make_frame(t):
    global frame, new_frame, diff
    action = agent.play_frame(diff)[1]
    new_frame = env.render(mode='rgb_array')
    new_frame[0][:8]=new_frame[0][8]
    diff = preprocess(np.copy(new_frame))-preprocess(np.copy(frame))
    frame = new_frame
    return frame
clip = mpy.VideoClip(make_frame, duration=25)
clip.write_gif("Pong.gif",fps=15)