This experiment compares two situations, with smart start and without smart start. In the case without smart start the agent starts in the same state every iteration. In the case with smart start the agent starts in the state which has the lowest kernel density estimation compared to all visited states.
The state with the lowest kernel density is calculated according to $i = \underset{i \in D}{\arg\min}\frac{1}{|D|}\sum_{j \in D}^Ne^{-\sum_{d = 1}^{D_N}(i_d - j_d)^2 / C}$
In [1]:
import numpy as np
import tensorflow as tf
from drl.replaybuffer import ReplayBufferTF
from drl.rrtexploration import Trajectory
from drl.exploration import OrnSteinUhlenbeckNoise
import time
In [7]:
import plotly
import plotly.offline as py
from plotly.graph_objs import *
from plotly import tools
plotly.offline.init_notebook_mode()
scene=Scene(
xaxis=XAxis(
gridcolor='rgb(255, 255, 255)',
zerolinecolor='rgb(255, 255, 255)',
showbackground=True,
backgroundcolor='rgb(230, 230,230)'
),
yaxis=YAxis(
gridcolor='rgb(255, 255, 255)',
zerolinecolor='rgb(255, 255, 255)',
showbackground=True,
backgroundcolor='rgb(230, 230,230)'
),
zaxis=ZAxis(
gridcolor='rgb(255, 255, 255)',
zerolinecolor='rgb(255, 255, 255)',
showbackground=True,
backgroundcolor='rgb(230, 230,230)'
)
)
In [8]:
class DoubleIntegrator(object):
def __init__(self):
self.viewer = None
self.w, self.h = 50, 50
self.plot_density_map = False
self.plot_trajectory = False
self.high_state = np.array([5., 5.])
self.low_state = -self.high_state
self.high_action = np.array([0.1, 0.1])
self.low_action = -self.high_action
self.action_dim = 2
self.state_dim = 2
self.state = None
def reset(self, state=None):
if state is None:
self.state = np.array([0., 0.])
else:
self.state = state
return self.get_ob()
def step(self, a):
s = self.state
a = np.clip(np.array(a), self.low_action, self.high_action)
ns = s + a
self.state = np.clip(ns, self.low_state, self.high_state)
terminal = False
reward = 0.
return self.get_ob(), reward, terminal, {}
def get_ob(self):
return self.state
def render(self, mode='human', close=False):
if close:
if self.viewer is not None:
self.viewer.close()
self.viewer = None
return
if self.viewer is None:
from gym.envs.classic_control import rendering
self.viewer = rendering.Viewer(500, 500)
self.viewer.set_bounds(-5.1, 5.1, -5.1, 5.1)
if self.plot_density_map:
self.init_plot_density_map()
self.agent = rendering.make_circle(0.1)
self.agent.set_color(.8, .3, .3)
self.agent_transform = rendering.Transform()
self.agent.add_attr(self.agent_transform)
self.viewer.add_geom(self.agent)
self.agent_transform.set_translation(self.state[0], self.state[1])
return self.viewer.render(return_rgb_array=mode == 'rgb_array')
def toggle_plot_density(self):
self.plot_density_map = True
def init_plot_density_map(self):
from gym.envs.classic_control import rendering
cw = 10.2 / self.w
ch = 10.2 / self.h
self.density_map = [[0 for x in range(self.w)] for y in range(self.h)]
offset = 5.
for x in range(self.w):
for y in range(self.h):
l, r, t, b = x * cw - offset, (x + 1) * cw - offset, (y + 1) * ch - offset, y * ch - offset
v = [(l, b), (l, t), (r, t), (r, b)]
polygon = rendering.FilledPolygon(v)
polygon._color.vec4 = (0., 1., 0., 0.)
self.density_map[x][y] = polygon
self.viewer.add_geom(polygon)
def update_density_map(self, values):
if not self.plot_density_map:
return False
max = np.max(values)
for x in range(self.w):
for y in range(self.h):
self.density_map[x][y]._color.vec4 = (0., 1., 0., values[x][y] / max)
def toggle_plot_trajectories(self):
self.plot_trajectory = True
def add_trajectory(self, trajectory):
from gym.envs.classic_control import rendering
self.viewer.geoms.remove(self.agent)
states = trajectory.get_states()
for i in range(states.shape[0] - 1):
line = rendering.make_polyline((states[i, :], states[i+1, :]))
self.viewer.add_geom(line)
self.viewer.add_geom(self.agent)
In [9]:
C = 2.5 # Is a hyper parameter that varies the width of the kernel
buffer_size = 10000
batch_size = 64
num_episodes = 50
max_steps = 100
render_step = False
render_traj = True
# Ornstein Uhlenbeck noise parameters
mu = 0.
sigma = 0.15
theta = 0.2
In [10]:
env = DoubleIntegrator()
sess = tf.Session()
buffer = ReplayBufferTF(sess, env.state_dim, env.high_state, C, buffer_size)
noise = OrnSteinUhlenbeckNoise(env.action_dim, mu, sigma, theta)
new_state = None
start = time.time()
if render_traj:
env.toggle_plot_trajectories()
env.reset()
env.render()
for i in range(num_episodes):
obs = env.reset(new_state)
trajectory = Trajectory()
for j in range(max_steps):
if render_step:
env.render()
action = noise.sample()
trajectory.add_node(obs, action)
next_obs, reward, terminal, info = env.step(action)
buffer.add(obs, action, reward, terminal, next_obs)
obs = next_obs
noise.reset()
if render_traj:
env.add_trajectory(trajectory)
env.render()
print("episode: ", i, ', time elapsed: ', (time.time() - start))
new_state = buffer.calc_trans_min_density()[0]
In [11]:
x = np.linspace(-env.high_state[0], env.high_state[0])
y = np.linspace(-env.high_state[1], env.high_state[1])
xx, yy = np.meshgrid(x, y)
values = np.zeros((len(x), len(y)))
for i in range(len(x)):
for j in range(len(y)):
values[i, j] = buffer.calc_density(np.array([[xx[i, j], yy[i,j]]]))
In [12]:
surface = Surface(x=xx, y=yy, z=values)
data = Data([surface])
layout = Layout(
title='Smart start',
scene=scene
)
fig = Figure(data=data, layout=layout)
py.iplot(fig)
In [13]:
env_no = DoubleIntegrator()
start = time.time()
noise = OrnSteinUhlenbeckNoise(env_no.action_dim, mu, sigma, theta)
buffer_no = ReplayBufferTF(sess, env_no.state_dim, env_no.high_state, C, buffer_size)
if render_traj:
env_no.toggle_plot_trajectories()
env_no.reset()
env_no.render()
for i in range(num_episodes):
obs = env_no.reset()
trajectory = Trajectory()
for j in range(max_steps):
if render_step:
env_no.render()
action = noise.sample()
trajectory.add_node(obs, action)
next_obs, reward, terminal, info = env_no.step(action)
buffer_no.add(obs, action, reward, terminal, next_obs)
obs = next_obs
noise.reset()
if render_traj:
env_no.add_trajectory(trajectory)
env_no.render()
print("episode: ", i, ', time elapsed: ', (time.time() - start))
In [14]:
x = np.linspace(-env.high_state[0], env.high_state[0])
y = np.linspace(-env.high_state[1], env.high_state[1])
xx, yy = np.meshgrid(x, y)
values_no = np.zeros((len(x), len(y)))
for i in range(len(x)):
for j in range(len(y)):
values_no[i, j] = buffer_no.calc_density(np.array([[xx[i, j], yy[i,j]]]))
In [15]:
surface = Surface(x=xx, y=yy, z=values_no)
data = Data([surface])
layout = Layout(
title='Normal',
scene=scene
)
fig = Figure(data=data, layout=layout)
py.iplot(fig)
In [16]:
env.render(close=True)
In [17]:
env_no.render(close=True)
Smart start explores is a lot better to explore the state space, as expected. There are two main issues though:
1. We assumed you can start at any point previously visited, i.e. in the replay buffer. We assumed that you dan replay previous trajectories to get there, in future work the goal is to go there using iLQG.
2. The time needed to do the kernel density estimates on the replay buffer is way to high, the reason is it does not scale at all, the computation time grows exponentially with the number of samples in the replaybuffer. Two more efficient ways of chosing a exploration point is:
a. Generate goals as in 'Automatic Goal Generation for Reinforcement Learning Agents, Held et al.'
b. Store the temporal difference errors for each transition in the replaybuffer, order the buffer based on the TD-error and chose the transition with the highest TD-error as exploration point.