Personal implementation of arXiv:1702.03037 [cs.MA].
Refs:
[1] DQN paper
[2] An implementation of a simpler game in PyTorch at http://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
Language chosen PyTorch since new, python, GPU.
In [1]:
# General import
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
#from copy import deepcopy
#from PIL import Image
import math
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
import torch.nn.functional as F
#import torchvision.transforms as T
# is_ipython = 'inline' in matplotlib.get_backend()
# if is_ipython:
# from IPython import display
In [2]:
from modules.gathering import gathering_game
In [3]:
# test gathering_game class. test init functions
game_pars={}
game_pars['gamma']=.99
game_pars['N_apples']=2
game_pars['N_tagged']=5
# local vars, should not be changed
game_pars['W'] = 33 # Width, always odd
game_pars['H'] = 11 # Height, always odd
game_pars['size_obs_ahead'] = 15 # number of sites the players can see in front of them
game_pars['size_obs_side'] = 10 # number of sites the players can see on their side
test = gathering_game(game_pars)
print('pars',test.pars)
print(test.dir)
print(test.s.shape)
test.show_screen()
In [4]:
test.reset()
In [4]:
# s_t, a_{0,t}, a_{1,t}, s_{t+1}
test.show_screen()
r0,r1=test.transition_and_get_reward(test.actions_dict['stand_still'], test.actions_dict['rotate_right'])
test.show_screen()
In [5]:
# test of observation functions
# test of obs_0
r0,r1=test.transition_and_get_reward(test.actions_dict['rotate_right'], test.actions_dict['rotate_left'])
test.show_screen()
#print('Reward', r0,r1)
obs_0_s=test.obs_0()
to_show = obs_0_s.transpose((2,1,0))
print(to_show.shape)
plt.imshow(to_show,origin='lower')
plt.show()
# test of obs_1
obs_1_s=test.obs_1()
to_show = obs_1_s.transpose((2,1,0))
print(to_show.shape)
plt.imshow(to_show,origin='lower')
plt.show()
In [8]:
test.reset()
test.show_screen()
In [20]:
for i in range(15):
test.transition_and_get_reward(test.actions_dict['step_forward'], test.actions_dict['step_forward'])
test.show_screen()
In [37]:
#r0,r1=test.transition_and_get_reward(test.actions_dict['stand_still'], test.actions_dict['stand_still'])
r0,r1=test.transition_and_get_reward(test.actions_dict['step_forward'], test.actions_dict['step_forward'])
#r0,r1=test.transition_and_get_reward(test.actions_dict['step_left'], test.actions_dict['step_right'])
test.show_screen()
print('Reward',r0,r1)
In [195]:
r0,r1=test.transition_and_get_reward(test.actions_dict['step_right'], test.actions_dict['step_right'])
test.show_screen()
print('Reward', r0,r1)
In [10]:
# test the transition functions by performing random moves:
import time
def random_actions():
# init
game = gathering_game(game_pars)
# play N random actions and show on screen
N = 5
for t in range(N):
print('Time',game.global_time)
a0,a1 = (8*np.random.random((2,))).astype(int)
for k,v in game.actions_dict.items():
if a0 == v:
print('Action 0:',k)
if a1 == v:
print('Action 1:',k)
game.transition_and_get_reward(a0, a1)
game.show_screen()
time.sleep(1)
In [11]:
random_actions()
In [64]:
# Helper function that compute the output of a cross correlation
def dim_out(dim_in,ks,stride):
return math.floor((dim_in-ks)/stride+1)
class DQN(nn.Module):
def __init__(self, hp):
"""hp = hyperparameters, dictionary"""
super(DQN, self).__init__()
# Conv2D has arguments C_in, C_out, ... where C_in is the number of input channels and C_out that of
# output channels, not to be confused with the size of the image at input and output which is automatically
# computed given the input and the kernel_size.
# Further, in the help, (N,C,H,W) are resp. number of samples, number of channels, height, width.
# Note: that instead nn.Linear requires both number of input and output neurons. The reason is that
# conv2d only has parameters in the kernel, which is independent of the number of neurons.
# Note: we do not use any normalization layer
self.C_H = hp['C_H']
ks = hp['kernel_size']
stride = hp['stride']
self.conv1 = nn.Conv2d(hp['C_in'], self.C_H, kernel_size=ks, stride=stride)
self.H1 = dim_out(hp['obs_window_H'],ks,stride)
self.W1 = dim_out(hp['obs_window_W'],ks,stride)
in_size = self.C_H*self.W1*self.H1
self.lin1 = nn.Linear(in_size, in_size) #lots of parameters!
self.conv2 = nn.Conv2d(self.C_H, self.C_H, kernel_size=ks, stride=stride)
H2 = dim_out(self.H1,ks,stride)
W2 = dim_out(self.W1,ks,stride)
in_size = self.C_H*W2*H2
self.lin2 = nn.Linear(in_size, hp['C_out'])
def forward(self, x):
# Apply rectified unit (relu) after each layer
x = F.relu(self.conv1(x))
# to feed into self.lin. we reshape x has a (size(0), rest) tensor where size(0) is number samples.
# -1 tells it to infer size automatically.
x = x.view(x.size(0), -1)
x = F.relu(self.lin1(x))
# reshape to feed it into conv2, this time:
x = x.view(x.size(0), self.C_H, self.H1, self.W1)
x = F.relu(self.conv2(x))
# reshape to feed it into lin2, this time:
x = x.view(x.size(0), -1)
x = F.relu(self.lin2(x))
return x
In [65]:
# TEST of DQN
hp = {}
hp['C_in'] = 3 # for RGB
hp['C_H'] = 32 # number of hidden units (or channels)
hp['C_out'] = 8 # number of actions.
hp['kernel_size'] = 5
hp['stride'] = 2
# width and height of observation region
hp['obs_window_W'] = 21
hp['obs_window_H'] = 16
#print(dim_out(dim_out(30,5,2),5,2))
model_test = DQN(hp)
for p in model_test.parameters():
print(p.size())
# test with a random smaple (use unsqueeze to get extra batch dimension)
x_test = autograd.Variable(torch.randn(3, hp['obs_window_H'], hp['obs_window_W']).unsqueeze(0))
print('x',x_test.size(),type(x_test))
y_pred = model_test(x_test)
print(y_pred.data)
print(y_pred.data.max(1))
print(y_pred.data.max(1)[1])
#print("y : ",y_pred.data.size())
#print(y_pred[0,:])
In [4]:
# namedtuple: tuple subclass with elements accessible by name with . operator (here name class=name instance)
# e_t = (s_t, a_t, r_t, s_{t+1})
# globally defined and used by replay_memory
experience = namedtuple('Experience',
('observation', 'action', 'reward', 'next_observation'))
class replay_memory(object):
"""
A cyclic buffer of bounded size that holds the transitions observed recently.
It also implements a .sample() method for selecting a random batch of transitions for training.
"""
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0
def push(self, *args):
"""Saves a transition."""
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = experience(*args)
# cyclicity:
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
In [38]:
# test namedtuple. all its members are torch tensors
# s = torch.randn(3,2,2).unsqueeze(0)
# a = torch.Tensor([1])
# sp = torch.randn(3,2,2).unsqueeze(0)
# r = torch.Tensor([0])
# test_exp = experience(s,a,r,sp)
# test_exp.action
# test of memory: OK
N=1
batch_size = 1
rm_test = replay_memory(N)
for i in range(N):
s = torch.randn(3,2,2).unsqueeze(0)
a = torch.floor(torch.rand(1)*8)
sp = torch.randn(3,2,2).unsqueeze(0)
# r = torch.randn(1)
r = torch.ByteTensor([1])
rm_test.push(s,a,r,sp)
# this is a list of namedtuples
sample_experience = rm_test.sample(batch_size)
# Transpose the batch (see http://stackoverflow.com/a/19343/3343043 for
# detailed explanation).
# This is a namedtuple of lists
minibatch = experience(*zip(*sample_experience))
# get obs,action,next_obs,reward batches in Variable
for s in minibatch.next_observation:
if s is None:
print('########### None')
next_obs_batch = autograd.Variable(torch.cat(minibatch.next_observation),
volatile=True)
obs_batch = autograd.Variable(torch.cat(minibatch.observation))
action_batch = autograd.Variable(torch.cat(minibatch.action))
reward_batch = autograd.Variable(torch.cat(minibatch.reward))
In [24]:
sample_experience[0].action
Out[24]:
In [41]:
minibatch.action
Out[41]:
In [5]:
def eps_decay(eps_start, eps_end, gamma, t):
"""Returns the value of eps at time t according to epsilon decay from eps_start
to eps_end with decay rate gamma"""
ret = eps_end + \
(eps_start - eps_end) * np.exp(-1. * t / gamma)
return ret
def policy(model, obs, n_actions, eps):
"""epsilon-greedy policy. Input:
model : nn approximator for Q,
obs : an observation, tensor below promoted to autograd.Variable
n_action : the number of possible actions (gathering, = 8)
t : time.
Returns an action."""
assert(0 <= eps <= 1)
random_num = random.random()
print('rand',random_num, 'eps',eps)
if random_num > eps:
# to be adjusted eventually.
# volatile: Boolean indicating that the Variable should be used in
# inference mode (forward), i.e. don't save the history. See
# :ref:`excluding-subgraphs` for more details.
# Can be changed only on leaf Variables.
print('In max policy')
y_pred = model(autograd.Variable(obs, volatile=True))
# data.max(1) returns an array with 0 component the maximum values for each sample in the batch
# and 1 component their indices, which is selected here, so giving which action maximizes the model for Q.
return y_pred.data.max(1)[1].cpu()
else:
print('In rand policy')
return torch.LongTensor([[random.randrange(n_actions)]])
In [6]:
# preprocess:
def get_preprocessed_obs(game,pl):
"""preprocessed input observation window of player pl from game.
Convert to float, convert to torch tensor (this doesn't require a copy)
and add a batch dimension """
assert(pl==0 or pl==1)
if pl == 0:
ret = game.obs_0()
else:
ret = game.obs_1()
ret = np.ascontiguousarray(ret, dtype=np.float32)
ret = torch.from_numpy(ret).unsqueeze(0)
#print('my_obs',my_obs.size(),type(my_obs))
return ret
In [7]:
# parameters
game_pars={}
game_pars['N_apples']=2
game_pars['N_tagged']=5
# local vars, should not be changed
game_pars['W'] = 33 # Width, always odd
game_pars['H'] = 11 # Height, always odd
game_pars['size_obs_ahead'] = 15 # number of sites the players can see in front of them
game_pars['size_obs_side'] = 10 # number of sites the players can see on their side
# and hyper-parameters
hp = {}
hp['C_in'] = 3 # for RGB
hp['C_H'] = 32 # number of hidden units (or channels)
hp['C_out'] = 8 # number of actions.
hp['kernel_size'] = 5
hp['stride'] = 2
# size of the observation window, related to output of obs_*
hp['obs_window_W'] = 21
hp['obs_window_H'] = 16
# for replay_memory
mem_pars = {}
mem_pars['capacity'] = 2
mem_pars['batch_size'] = 1
# gamma = discount of reward
gamma = .99
# eps for policy
eps_start = 0.9
eps_end = 0.05
decay_rate = 200
#
# Now init the variables
#
# Q function approximators for player 0 and 1
Q_0 = DQN(hp)
Q_1 = DQN(hp)
rpl_memory_0 = replay_memory(mem_pars['capacity'])
rpl_memory_1 = replay_memory(mem_pars['capacity'])
# game definition
game = gathering_game(game_pars)
obs_0 = get_preprocessed_obs(game,0)
obs_1 = get_preprocessed_obs(game,1)
In [34]:
# test of policy: OK
my_obs = obs_1
# nn:
my_model = Q_1
a=policy(my_model, my_obs, game.n_actions, 0.5)
type(a[0,0])
Out[34]:
In [51]:
# Choose minimum square error loss function and SGD optimizer
loss_fn = torch.nn.MSELoss(size_average=False)
optimizer_0 = optim.SGD(Q_0.parameters(),lr=0.01)
optimizer_1 = optim.SGD(Q_1.parameters(),lr=0.01)
In [57]:
def optimize(model, loss_fn, optimizer, rpl_memory, batch_size, gamma):
"""TODO: understand issue with volatile..."""
# if the memory is smaller than wanted, don't do anything and keep building memory
print('In optimize: len(rpl_memory), bacth_size', len(rpl_memory), batch_size)
if len(rpl_memory) < batch_size:
return
#otherwise get minibatch of experiences
# this is a list of namedtuples
sample_experience = rpl_memory.sample(batch_size)
# Transpose the batch (see http://stackoverflow.com/a/19343/3343043 for
# detailed explanation).
# This is a namedtuple of lists
minibatch = experience(*zip(*sample_experience))
print('minibatch.reward:',minibatch.reward)
# get obs,action,next_obs,reward batches in Variable
for s in minibatch.next_observation:
if s is None:
print('########### None')
# Compute a mask of non-final states and concatenate the batch elements. This to get rid of None
#non_final_mask = torch.ByteTensor(
# tuple(map(lambda s: s is not None, minibatch.next_observation)))
next_obs_batch = autograd.Variable(torch.cat(minibatch.next_observation),
volatile=True)
obs_batch = autograd.Variable(torch.cat(minibatch.observation))
action_batch = autograd.Variable(torch.cat(minibatch.action))
reward_batch = autograd.Variable(torch.cat(minibatch.reward))
# Compute Q(obs, action) - the model computes Q(obs), then we select the
# columns of actions taken
print("In optimize: obs_batch", obs_batch.data.size())
obs_action_values = model(obs_batch).gather(1, action_batch)
# Compute V(obs')=max_a Q(obs, a) for all next states.
next_obs_values = model(next_obs_batch).max(1)[0]
# Now, we don't want to mess up the loss with a volatile flag, so let's
# clear it. After this, we'll just end up with a Variable that has
# requires_grad=False
next_obs_values.volatile = False
# Compute y
y = (next_obs_values * gamma) + reward_batch
# Compute loss
loss = loss_fn(obs_action_values, y)
# Optimize the model
optimizer.zero_grad()
loss.backward()
for param in model.parameters():
param.grad.data.clamp_(-1, 1)
optimizer.step()
In [58]:
# training loop over episodes
def train(M,T,eps_start,eps_end,decay_rate,Q_0,Q_1,obs_0,obs_1):
""" ... """
for episode in range(M):
for t in range(T):
# policy
eps = eps_decay(eps_start, eps_end, decay_rate, t)
a_0 = policy(Q_0, obs_0, game.n_actions, eps)
a_1 = policy(Q_1, obs_1, game.n_actions, eps)
print(a_0,a_1)
# execute action in emulator. (policy returns a 1x1 tensor)
r_0, r_1 = game.transition_and_get_reward(a_0[0,0], a_1[0,0])
obs_0_p = get_preprocessed_obs(game,0)
obs_1_p = get_preprocessed_obs(game,1)
# store experience (converting r: it is only 0,1 but treat as float since then added to return)
rpl_memory_0.push(obs_0, a_0, torch.FloatTensor([r_0]), obs_0_p)
rpl_memory_1.push(obs_1, a_1, torch.FloatTensor([r_1]), obs_1_p)
obs_0 = obs_0_p
obs_1 = obs_1_p
# optimize
optimize(Q_0, loss_fn, optimizer_0, rpl_memory_0, mem_pars['batch_size'], gamma)
optimize(Q_1, loss_fn, optimizer_1, rpl_memory_1, mem_pars['batch_size'], gamma)
In [59]:
M = 1
T = 2
train(M,T,eps_start,eps_end,decay_rate,Q_0,Q_1,obs_0,obs_1)
In [ ]: