In [1]:
from importlib import reload
In [15]:
import matplotlib.pyplot as plt
In [117]:
import gathering
In [118]:
reload(gathering)
Out[118]:
In [119]:
from gathering import gathering_game
In [132]:
# test gathering_game class. test init functions
game_pars={}
game_pars['gamma']=.99
game_pars['N_apples']=2
game_pars['N_tagged']=5
# local vars, should not be changed
game_pars['W'] = 33 # Width, always odd
game_pars['H'] = 11 # Height, always odd
game_pars['size_obs_ahead'] = 15 # number of sites the players can see in front of them
game_pars['size_obs_side'] = 10 # number of sites the players can see on their side
test_game = gathering_game(game_pars)
print('pars',test_game.pars)
print(test_game.dir)
print(test_game.s.shape)
test_game.show_screen(show=True)
In [133]:
# s_t, a_{0,t}, a_{1,t}, s_{t+1}
#r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['stand_still'], test_game.actions_dict['rotate_right'])
#test_game.show_screen(show=True)
#
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['rotate_right'], test_game.actions_dict['rotate_right'])
test_game.show_screen(show=True)
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['stand_still'], test_game.actions_dict['step_forward'])
test_game.show_screen(show=True)
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['stand_still'], test_game.actions_dict['step_forward'])
test_game.show_screen(show=True)
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['stand_still'], test_game.actions_dict['step_forward'])
test_game.show_screen(show=True)
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['stand_still'], test_game.actions_dict['step_forward'])
test_game.show_screen(show=True)
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['stand_still'], test_game.actions_dict['step_forward'])
test_game.show_screen(show=True)
In [134]:
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['stand_still'], test_game.actions_dict['rotate_left'])
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['stand_still'], test_game.actions_dict['rotate_left'])
test_game.show_screen(show=True)
In [135]:
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['use_beam'], test_game.actions_dict['use_beam'])
test_game.show_screen(show=True)
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['use_beam'], test_game.actions_dict['use_beam'])
test_game.show_screen(show=True)
In [141]:
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['stand_still'], test_game.actions_dict['stand_still'])
test_game.show_screen(show=True)
In [73]:
test_game.reset()
In [101]:
# s_t, a_{0,t}, a_{1,t}, s_{t+1}
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['stand_still'], test_game.actions_dict['rotate_right'])
test_game.show_screen(show=True)
#
r0,r1=test_game.transition_and_get_reward(test_game.actions_dict['step_left'], test_game.actions_dict['use_beam'])
test_game.show_screen(show=True)
In [102]:
test_game.show_screen(show=True)
In [103]:
# test of observation functions
# test of obs_0
#r0,r1=test.transition_and_get_reward(test.actions_dict['rotate_right'], test.actions_dict['rotate_left'])
#test.show_screen()
#print('Reward', r0,r1)
obs_0_s=test_game.obs_0()
to_show = obs_0_s.transpose((2,1,0))
print(to_show.shape)
plt.imshow(to_show,origin='lower')
plt.show()
# test of obs_1
obs_1_s=test_game.obs_1()
to_show = obs_1_s.transpose((2,1,0))
print(to_show.shape)
plt.imshow(to_show,origin='lower')
plt.show()
In [8]:
test.reset()
test.show_screen()
In [20]:
for i in range(15):
test.transition_and_get_reward(test.actions_dict['step_forward'], test.actions_dict['step_forward'])
test.show_screen()
In [37]:
#r0,r1=test.transition_and_get_reward(test.actions_dict['stand_still'], test.actions_dict['stand_still'])
r0,r1=test.transition_and_get_reward(test.actions_dict['step_forward'], test.actions_dict['step_forward'])
#r0,r1=test.transition_and_get_reward(test.actions_dict['step_left'], test.actions_dict['step_right'])
test.show_screen()
print('Reward',r0,r1)
In [195]:
r0,r1=test.transition_and_get_reward(test.actions_dict['step_right'], test.actions_dict['step_right'])
test.show_screen()
print('Reward', r0,r1)
In [10]:
# test the transition functions by performing random moves:
import time
def random_actions():
# init
game = gathering_game(game_pars)
# play N random actions and show on screen
N = 5
for t in range(N):
print('Time',game.global_time)
a0,a1 = (8*np.random.random((2,))).astype(int)
for k,v in game.actions_dict.items():
if a0 == v:
print('Action 0:',k)
if a1 == v:
print('Action 1:',k)
game.transition_and_get_reward(a0, a1)
game.show_screen()
time.sleep(1)
In [11]:
random_actions()
In [4]:
reload(dqn_file)
Out[4]:
In [14]:
import dqn_file
from dqn_file import dqn
In [15]:
# test of dqn: OK
import torch
import torch.autograd as autograd
C_in = 3 # for RGB
C_H = 32 # number of hidden units (or channels)
C_out = 8 # number of actions.
kernel_size = 5
stride = 2
# width and height of observation region
obs_window_W = 21
obs_window_H = 16
model_test = dqn(C_in, C_H, C_out, kernel_size, stride, obs_window_H, obs_window_W)
for p in model_test.parameters():
print(p.size())
# test with a random smaple (use unsqueeze to get extra batch dimension)
x_test = autograd.Variable(torch.randn(C_in, obs_window_H, obs_window_W).unsqueeze(0))
print('x',x_test.size(),type(x_test))
y_pred = model_test(x_test)
print(y_pred.data)
print(y_pred.data.max(1))
print(y_pred.data.max(1)[1])
In [6]:
import q_learner_file
In [125]:
reload(q_learner_file)
Out[125]:
In [126]:
from q_learner_file import experience, replay_memory, q_learner
In [11]:
# test namedtuple. all its members are torch tensors: OK
import torch
s = torch.randn(3,2,2).unsqueeze(0)
a = torch.Tensor([1])
sp = torch.randn(3,2,2).unsqueeze(0)
r = torch.Tensor([0])
test_exp = experience(s,a,r,sp)
test_exp.next_observation
Out[11]:
In [16]:
# test of memory: OK
import torch
import torch.autograd as autograd
N=5
batch_size = 2
rpl_mem_test = replay_memory(N)
for i in range(N):
s = torch.randn(3,obs_window_W,obs_window_H).unsqueeze(0)
a = torch.floor(torch.rand(1)*8)
sp = torch.randn(3,obs_window_W,obs_window_H).unsqueeze(0)
r = torch.randn(1)
rpl_mem_test.push(s,a,r,sp)
# create batch as in optimize
sample_experience = rpl_mem_test.sample(batch_size)
minibatch = experience(*zip(*sample_experience))
next_obs_batch = autograd.Variable(torch.cat(minibatch.next_observation),
volatile=True)
obs_batch = autograd.Variable(torch.cat(minibatch.observation))
action_batch = autograd.Variable(torch.cat(minibatch.action))
reward_batch = autograd.Variable(torch.cat(minibatch.reward))
# test grad_false in max_Q_next_obs=max_a Q(next_obs, a) using previous test model
max_Q_next_obs = model_test(next_obs_batch).max(1)[0]
print(max_Q_next_obs.data)
print(max_Q_next_obs.creator)
print(max_Q_next_obs.grad)
print(max_Q_next_obs.requires_grad)
print(max_Q_next_obs.volatile)
max_Q_next_obs.volatile = False
print('after volatile=False')
print(max_Q_next_obs.requires_grad)
print(max_Q_next_obs.volatile)
In [127]:
# parameters
qpars = {}
qpars['C_in'] = 3 # for RGB
qpars['C_H'] = 32 # number of hidden units (or channels)
qpars['C_out'] = 8 # number of actions.
qpars['kernel_size'] = 5
qpars['stride'] = 2
qpars['obs_window_W'] = 21
qpars['obs_window_H'] = 16
qpars['capacity'] = 5
qpars['batch_size'] = 2
qpars['gamma'] = .99
qpars['eps_start'] = 0.9
qpars['eps_end'] = 0.05
qpars['decay_rate'] = 200
agent_test = q_learner(qpars)
In [128]:
# test of preprocess_obs and policy: OK
my_obs = agent_test.preprocess_obs(test_game.obs_0())
agent_test.eps_greedy(my_obs, 0.5)
Out[128]:
In [129]:
# test of perceive: OK
import random
s=test_game.obs_0()
a=torch.LongTensor([[random.randrange(agent_test.C_out)]])
r=1
sp=test_game.obs_1()
t=50
agent_test.perceive(s,a,r,sp,t)
Out[129]:
In [123]:
len(agent_test.rpl_memory.memory)
Out[123]: