In [3]:
import gym
import numpy as np
import time
class BinaryActionLinearPolicy(object):
def __init__(self, theta):
self.w = theta[:-1]
self.b = theta[-1]
def act(self, ob):
y = ob.dot(self.w) + self.b
a = int(y < 0)
return a
class ContinuousActionLinearPolicy(object):
def __init__(self, theta, n_in, n_out):
assert len(theta) == (n_in + 1) * n_out
self.W = theta[0 : n_in * n_out].reshape(n_in, n_out)
self.b = theta[n_in * n_out : None].reshape(1, n_out)
def act(self, ob):
a = ob.dot(self.W) + self.b
return a
def cem(f, th_mean, batch_size, n_iter, elite_frac, initial_std=1.0):
"""
Generic implementation of the cross-entropy method for maximizing a black-box function
f: a function mapping from vector -> scalar
th_mean: initial mean over input distribution
batch_size: number of samples of theta to evaluate per batch
n_iter: number of batches
elite_frac: each batch, select this fraction of the top-performing samples
initial_std: initial standard deviation over parameter vectors
"""
n_elite = int(np.round(batch_size*elite_frac))
th_std = np.ones_like(th_mean) * initial_std
for _ in range(n_iter):
ths = np.array([th_mean + dth for dth in th_std[None,:]*np.random.randn(batch_size, th_mean.size)])
ys = np.array([f(th) for th in ths])
elite_inds = ys.argsort()[::-1][:n_elite]
elite_ths = ths[elite_inds]
th_mean = elite_ths.mean(axis=0)
th_std = elite_ths.std(axis=0)
yield {'ys' : ys, 'theta_mean' : th_mean, 'y_mean' : ys.mean()}
def do_rollout(agent, env, render=False):
total_rew = 0
ob = env.reset()
done = False
while done == False:
a = agent.act(ob)
(ob, reward, done, _info) = env.step(a)
total_rew += reward
# if render and t%3==0: env.render()
if done: break
return total_rew
def noisy_evaluation(theta):
agent = BinaryActionLinearPolicy(theta)
rew = do_rollout(agent, env)
return rew
env = gym.make("CartPole-v0")
print env.action_space.n
env.seed(0)
np.random.seed(0)
params = dict(n_iter=100, batch_size=400, elite_frac = 0.2)
for (i, iterdata) in enumerate(
cem(noisy_evaluation, np.zeros(env.observation_space.shape[0]+1), **params)):
print('Iteration %2i. Episode mean reward: %7.3f'%(i, iterdata['y_mean']))
agent = BinaryActionLinearPolicy(iterdata['theta_mean'])
print iterdata['theta_mean']
ob = env.reset()
done = False
start_t = time.time()
while done == False:
a = agent.act(ob)
(ob, reward, done, _info) = env.step(a)
env.render()
if done: break
end_t = time.time()
print('stay up time: '+str(end_t-start_t))
In [ ]:
import numpy as np
import gym
import cPickle as pickle
import random
from collections import deque
import sys
import time
import matplotlib.pyplot as plt
input_num = 4# number of input demension
output_num = 2 # number of valid actions
observe = 100 # timesteps to observe before training
explore = 5000 # frames over which to anneal epsilon
final_epsilon = 0.001 # final value of epsilon
initial_epsilon = 0.99 # starting value of epsilon
replay_memory = 5000 # number of previous transitions to remember
gamma = 0.7 # decay rate of past observations
mu = 0.0001
delta = 1.75
beta = 25
alpha = 4.72
def Guassion_kernel(x, u):
x = np.matrix(x,dtype=np.float64).T
u = np.matrix(u,dtype=np.float64).T
return np.exp(-(np.dot((x-u).T, (x-u)))/(2*delta**2))
def qfunc(phi_x, action, w):
return np.dot(phi_x.T, w[:,action])[0,0]
def greedy(phi_x, w):
q_value = np.dot(phi_x.T, w)
return np.argmax(q_value)
def epsilon_greedy(phi_x, w, epsilon):
amax = greedy(phi_x, w)
if random.random() <= epsilon:
amax = random.randrange(2)
return amax
def construct_Phi(x, ALD_buf):
Phi_x = []
for a in ALD_buf:
Phi_x.append(Guassion_kernel(x, a)[0,0])
return Phi_x
def achieve_y(r_t, s_t1, done, w, ALD_buf):
if done:
return -1
else:
x = np.array(s_t1)
k_t = np.matrix(construct_Phi(x, ALD_buf), dtype=np.float64).T
y_t = np.dot(k_t.T,w)
y_max = np.max(y_t)
return 0.01 + gamma * y_max
env = gym.make("CartPole-v0")
env.seed(0)
high = env.observation_space.high
low = env.observation_space.low
def norm(x):
norm_x = np.ones_like(x)
for i in range(len(x)):
norm_x[i] = (x[i]-low[i])/(high[i]-low[i])
return norm_x
np.random.seed(0)
n_action = env.action_space.n
num_steps = 200
epsilon = initial_epsilon
# initial gpql
s_t = env.reset()
init_x = np.array(s_t)
ALD_buf = []
ALD_buf.append(init_x)
k_tt = Guassion_kernel(init_x, init_x)
K_inv = k_tt.I
Pn = np.matrix((1/alpha)*np.eye(1))
Mn = np.matrix(np.zeros([1,n_action]))
m = 1
D = deque()
start_t = time.time()
# s_t = norm(s_t)
for i in range(200):
done = False
reward = 0
while done == False:
k_t = np.matrix(construct_Phi(s_t, ALD_buf), dtype=np.float64).T
a = greedy(k_t, Mn)
(s_t1, r, done, _info) = env.step(a)
# orig_s = s_t1
# s_t1 = norm(s_t1)
D.append((s_t, a, r, s_t1, done))
s_t = s_t1
reward += r
if done:
# print s_t
# print orig_s
epsilon -= (initial_epsilon - final_epsilon) / 100
print reward, epsilon
s_t = env.reset()
for j in range(100):
now_index = np.random.randint(len(D), size=1)[0]
s_t, action, r, s_t1, done = D[now_index]
x = s_t1
x = s_t1
y = achieve_y(r, x, done, Mn, ALD_buf)
k_tt = Guassion_kernel(x, x)
a_t = np.dot(K_inv, k_t)
delta_t = k_tt - np.dot(k_t.T, a_t)
if abs(delta_t) > mu:
ALD_buf.append(x)
ac_t = a_t
a_t = np.matrix(np.zeros(len(ALD_buf))).T
a_t[-1, -1] = 1
K_inv = K_inv*delta_t[0,0]+np.dot(ac_t, ac_t.T)
K_inv = np.row_stack((K_inv, -ac_t.T))
add_c = np.row_stack((-ac_t, 1))
K_inv = np.column_stack((K_inv, add_c))/delta_t[0,0]
k_t = np.row_stack((k_t, k_tt))
Pn = np.row_stack((Pn, np.zeros(len(ALD_buf)-1)))
Pn = np.column_stack((Pn, np.zeros(len(ALD_buf))))
Pn[-1, -1] = 1/alpha
qn = beta*np.dot(np.dot(Pn, k_t),k_t.T)/(1+beta*np.dot(np.dot(k_t.T, Pn), k_t))
Pn1 = Pn-np.dot(qn, Pn)
Mn = np.row_stack((Mn, np.zeros(2)))
Mn1 = beta*y*np.dot(Pn1, k_t)+Mn[:,action]-np.dot(qn, Mn[:,action])
Pn = Pn1
Mn[:,action] = Mn1
else:
qn = beta*np.dot(np.dot(Pn, k_t),k_t.T)/(1+beta*np.dot(np.dot(k_t.T, Pn), k_t))
Pn1 = Pn-np.dot(qn, Pn)
Mn1 = beta*y*np.dot(Pn1, k_t)+Mn[:,action]-np.dot(qn, Mn[:,action])
Pn = Pn1
Mn[:,action] = Mn1
print('is over')
In [2]:
import numpy
import random
import random as ran
import matplotlib.pyplot as plt
ran.seed(0)
class Mdp:
def __init__(self):
self.states = [1,2,3,4,5,6,7,8] # 0 indicates end
self.terminal_states = dict()
self.terminal_states[6] = 1
self.terminal_states[7] = 1
self.terminal_states[8] = 1
self.actions = ['n','e','s','w']
self.rewards = dict();
self.rewards['1_s'] = -1.0
self.rewards['3_s'] = 1.0
self.rewards['5_s'] = -1.0
self.t = dict();
self.t['1_s'] = 6
self.t['1_e'] = 2
self.t['2_w'] = 1
self.t['2_e'] = 3
self.t['3_s'] = 7
self.t['3_w'] = 2
self.t['3_e'] = 4
self.t['4_w'] = 3
self.t['4_e'] = 5
self.t['5_s'] = 8
self.t['5_w'] = 4
self.gamma = 0.8
def transform(self, state, action): ##return is_terminal,state, reward
if state in self.terminal_states:
return True, state, 0
key = '%d_%s'%(state, action)
if key in self.t:
next_state = self.t[key]
else:
next_state = state
is_terminal = False
if next_state in self.terminal_states:
is_terminal = True
if key not in self.rewards:
r = 0.0
else:
r = self.rewards[key]
return is_terminal, next_state, r
def random_pi():
actions = ['n','w','e','s']
r = int(ran.random() * 4)
return actions[r]
def compute_random_pi_state_value():
value = [ 0.0 for r in xrange(9)]
num = 1000000
for k in xrange(1,num):
for i in xrange(1,6):
mdp = Mdp();
s = i;
is_terminal = False
gamma = 1.0
v = 0.0
while False == is_terminal:
a = random_pi()
is_terminal, s, r = mdp.transform(s, a)
v += gamma * r;
gamma *= 0.5
value[i] = (value[i] * (k-1) + v) / k
if k % 10000 == 0:
plt.plot(value)
# print value
compute_random_pi_state_value()
plt.show()
In [ ]: