In [1]:
import gym
import numpy as np
import tensorflow as tf
from IPython import display
import matplotlib.pyplot as plt
import time
config = tf.ConfigProto()
#config.gpu_options.allow_growth = True
%matplotlib inline
In [2]:
env = gym.make("Pong-v0")
env.render(mode='rgb_array').shape
env.reset()
print(env.action_space)
print(env.unwrapped.get_action_meanings())
top = 32
bottom = 195
left = 14
right = 146
downsampled_height = int(np.rint((bottom-top)/2))
downsampled_width = int(np.rint((right-left)/2))
input_dim = downsampled_height*downsampled_width
def preprocess(img, reshape=False):
#crop, grab only one channel, and downsample by factor of 2
img = img[top:bottom,left:right,0][::2,::2]
#get rid of background color, 109 in first frame, 144 otherwise
img[np.isin(img,[144,109])] = 0
img[img!=0] = 1
if not reshape:
return img.astype(np.int).ravel()
else:
return img.astype(np.int)
def reshape(img):
return img.reshape(downsampled_height,downsampled_width).astype(np.int)
#what color pixels are in this image?
#print(list(zip(*np.unique(env.render(mode='rgb_array')[top:bottom,left:right,0],return_counts=1))))
#print(list(zip(*np.unique(env.render(mode='rgb_array')[top:bottom,left:right,0][::2,::2],return_counts=1))))
plt.subplots(2,3, figsize=(12,10))
plt.subplot(2,3,1)
plt.title("The Atari Pong Game Screen")
plt.imshow(env.reset())
plt.subplot(2,3,2)
plt.title("Cropped, First Channel Only")
plt.imshow(env.render(mode='rgb_array')[top:bottom,left:right,0])
plt.subplot(2,3,3)
plt.title("Prior Plus Downsample")
plt.imshow(env.render(mode='rgb_array')[top:bottom,left:right,0][::2,::2])
plt.subplot(2,3,4)
plt.title("After a step, color scheme changes")
plt.imshow(env.step(2)[0][top:bottom,left:right,0][::2,::2])
plt.subplot(2,3,5)
plt.title("After Preprocessing Frame 1")
plt.imshow(reshape(preprocess(env.reset())))
plt.subplot(2,3,6)
plt.title("After Preprocessing Frame 2")
plt.imshow(reshape(preprocess(env.step(2)[0])))
plt.show()
#print(list(zip(*np.unique(env.render(mode='rgb_array')[top:bottom,left:right,0][::2,::2],return_counts=1))))
#print(list(zip(*np.unique(reshape(preprocess(env.reset())),return_counts=1))))
#print(list(zip(*np.unique(reshape(preprocess(env.render(mode='rgb_array'))),return_counts=1))))
In [ ]:
sess = tf.InteractiveSession(config=config)
x = tf.placeholder(tf.float32, shape=[None, input_dim])
advantage = tf.placeholder(tf.float32, shape=[None])
action_is_down = tf.placeholder(tf.float32, shape=[None])
h1_dim = 200
l1 = tf.layers.dense(x, h1_dim, activation=tf.nn.relu)
#use tf.squeeze() to reshape this from [batch_size,1] to [batch_size]
logit = tf.squeeze(tf.layers.dense(l1, 1))
sampled_action_negative_log_prob = tf.nn.sigmoid_cross_entropy_with_logits(logits=logit, labels=action_is_down)
reward_weighted_neg_likelihood = advantage*sampled_action_negative_log_prob
optimizer = tf.train.AdamOptimizer(learning_rate=1e-4)
train = optimizer.minimize(tf.reduce_sum(reward_weighted_neg_likelihood))
saver = tf.train.Saver()
tf.global_variables_initializer().run()
In [ ]:
class pong_agent:
def clean_slate(self):
self.wins = 0
self.games = 0
self.p_list = []
self.actions = []
self.frames = []
self.frame_changes = []
self.rewards = []
def make_batch(self, n_sets):
self.clean_slate()
for _ in range(n_sets):
self.play_set()
self.normalize_rewards()
return self.frame_changes, self.actions, self.rewards
def play_set(self):
env.reset()
done = 0
self.frames.append(preprocess(env.render(mode='rgb_array')))
self.frame_changes.append(self.frames[-1] - self.frames[-1])
while not done:
done = self.play_point()
def play_point(self):
frames_played = 0
discount = 0.99
while True:
prob, action, reward, new_frame, done = self.play_frame(self.frame_changes[-1])
self.p_list.append(prob)
self.actions.append(action)
frames_played+= 1
if not done:
self.frames.append(new_frame)
self.frame_changes.append(self.frames[-1] - self.frames[-2])
if reward:
self.rewards+= [reward * discount**k for k in reversed(range(frames_played))]
self.wins+= max(reward,0)
self.games+= 1
break
return done
def play_frame(self, frame_change):
p_down = 1/(1+np.exp(-sess.run(logit, feed_dict={x:np.array([frame_change])})))
#sample an action using p_down, 3=down, 2=up
action = np.random.binomial(1, p_down) + 2
observation, reward, done = env.step(action)[:3]
return p_down, action, reward, preprocess(observation), done
def normalize_rewards(self):
mean = np.mean(self.rewards)
std_dev = np.std(self.rewards)
self.rewards = (np.array(self.rewards)-mean)/std_dev
This part takes a while. My setup processes the earlier batches at a rate of ~100 batches/hour. To monitor progress (the agent's win rate), I output at a .PNG plot every 10 batches. There's a visible shift in the win rate from ~2% to ~4% by batch 300. Later batches take longer to process because the agent is playing more games.
To speed up and improve quality in the later stages of training, we could trim the number of games played (because each point takes more frames to complete as the agent gets better), and add a decaying learning rate, respectively.
In [ ]:
start = time.time()
ratios = []
matches_per_batch = 10
epochs = 3001
agent = pong_agent()
for i in range(epochs):
#play Pong with the network, save frames and associated rewards
frame_changes, actions, rewards = agent.make_batch(matches_per_batch)
train.run(feed_dict={x:frame_changes, action_is_down:np.array(actions)==3, advantage:rewards})
ratios.append(agent.wins/agent.games*100)
if i%10==0:
print("{}: batch {} finished after {} hours".format(time.strftime('%X %x '),
i, round((time.time()-start)/3600,2)))
plt.title("Agent Quality over Time")
plt.plot(range(1,i+2), ratios)
plt.xlabel("Number of Updates")
plt.ylabel("Percent of Games Won")
plt.savefig("./pong_agent_quality")
if i%100==0:
#save out the neural network's weights here
saver.save(sess, "./pong_agent.ckpt")
In [ ]:
sess.close()
tf.reset_default_graph()
agent = pong_agent()
sess = tf.InteractiveSession(config=config)
x = tf.placeholder(tf.float32, shape=[None, input_dim])
advantage = tf.placeholder(tf.float32, shape=[None])
action_is_down = tf.placeholder(tf.float32, shape=[None])
h1_dim = 200
l1 = tf.layers.dense(x, h1_dim, activation=tf.nn.relu)
#use tf.squeeze() to reshape this from [batch_size,1] to [batch_size]
logit = tf.squeeze(tf.layers.dense(l1, 1))
sampled_action_negative_log_prob = tf.nn.sigmoid_cross_entropy_with_logits(logits=logit, labels=action_is_down)
reward_weighted_neg_likelihood = advantage*sampled_action_negative_log_prob
optimizer = tf.train.AdamOptimizer(learning_rate=1e-4)
train = optimizer.minimize(tf.reduce_sum(reward_weighted_neg_likelihood))
saver = tf.train.Saver()
saver.restore(sess, "./pong_agent.ckpt")
number_of_frames_to_play = 400
frame = env.reset()
new_frame = np.zeros_like(frame)
diff = np.zeros_like(preprocess(np.copy(frame)))
img = plt.imshow(frame)
for i in range(number_of_frames_to_play):
action = agent.play_frame(diff)[1]
new_frame = env.render(mode='rgb_array')
diff = preprocess(np.copy(new_frame))-preprocess(frame)
frame = new_frame
img.set_data(new_frame)
display.display(plt.gcf())
display.clear_output(wait=True)
In [ ]:
import moviepy.editor as mpy
global frame, new_frame, diff
frame = env.reset()
#resetting a couple of pixels that turned black for some reason
frame[0][:8]=frame[0][8]
new_frame = np.copy(frame)
diff = np.zeros_like(preprocess(np.copy(frame)))
def make_frame(t):
global frame, new_frame, diff
action = agent.play_frame(diff)[1]
new_frame = env.render(mode='rgb_array')
new_frame[0][:8]=new_frame[0][8]
diff = preprocess(np.copy(new_frame))-preprocess(np.copy(frame))
frame = new_frame
return frame
clip = mpy.VideoClip(make_frame, duration=25)
clip.write_gif("Pong.gif",fps=15)