In [1]:
import numpy as np
from drl.ilqg import ilqg
from drl.env.arm import TwoLinkArm
from sklearn.linear_model import LinearRegression
from sklearn import preprocessing
env = TwoLinkArm(g=0., wp=10., wv=1., wu=0.001)
N = 5 # number of future steps for iLQG
num_episodes = 25
max_steps = 50
In [2]:
def cst(x, u):
return env.cost_func(x, u)
In [3]:
def dynamics_func(model, x, u):
u[np.isnan(u)] = 0.
X_in = np.concatenate((x, u)).reshape(1, -1)
beta = models[i].coef_
return models[i].predict(X_in)[0], beta[:,:env.state_dim].T, beta[:,env.state_dim:].T, None, None, None
In [4]:
models = []
for i in range(max_steps):
models.append(LinearRegression())
x = x0 = env.reset()
goal = env.goal
traj_rewards = []
# Initialize random control sequence
u = np.random.randn(max_steps, env.action_dim)
# Initialize data matrices
X = np.zeros((max_steps, num_episodes, env.state_dim + env.action_dim))
Y = np.zeros((max_steps, num_episodes, env.state_dim))
# Simulate systems once
reward = 0.
for i_step in range(max_steps):
env.render()
X[i_step,0,:] = np.concatenate((x, u[i,:]))
x, r, t, _ = env.step(u[i,:])
Y[i_step,0,:] = x
reward += r
traj_rewards.append(reward)
print('Iter %d, Steps %d, Reward: %s' % (0, i_step+1, reward))
# Only use first N control inputs for iLQG estimator
u = u[:N,:]
for i_episode in range(1, num_episodes):
# Fit models
# # TODO: Add scaled weights for better fitted models
# if len(traj_rewards) > 5:
# sample_weights = preprocessing.scale(traj_rewards)
# sample_weights -= np.min(sample_weights) - 0.5
# else:
# sample_weights = [1.]*len(traj_rewards)
for i in range(max_steps-N):
x_tmp = X[i:i+N,:,:]
x_tmp = np.reshape(x_tmp, [x_tmp.shape[0]*x_tmp.shape[1], x_tmp.shape[2]])
y_tmp = Y[i:i+N,:,:]
y_tmp = np.reshape(y_tmp, [y_tmp.shape[0]*y_tmp.shape[1], y_tmp.shape[2]])
models[i].fit(x_tmp, y_tmp)
for i in range(max_steps-N, max_steps):
x_tmp = X[i:,:,:]
x_tmp = np.reshape(x_tmp, [x_tmp.shape[0]*x_tmp.shape[1], x_tmp.shape[2]])
y_tmp = Y[i:,:,:]
y_tmp = np.reshape(y_tmp, [y_tmp.shape[0]*y_tmp.shape[1], y_tmp.shape[2]])
models[i].fit(x_tmp, y_tmp)
x = env.reset(x0, goal)
terminal = False
i_step = 0
reward = 0.
for i_step in range(max_steps):
env.render()
# iLQG estimate with fitted dynamics
dyn = lambda x, u: dynamics_func(models[i_step], x, u)
_, u, L, Vx, Vxx, cost = ilqg(dyn, cst, x, u, {})
# Take step
x_new, r, t, _ = env.step(u[0, :])
# Add to data matrices
X[i_step,i_episode,:] = np.concatenate((x, u[0,:]))
Y[i_step,i_episode,:] = x_new
u = np.concatenate((u[1:,:], np.random.randn(1, env.action_dim)))
x = x_new
reward += r
i_step += 1
if t:
break
traj_rewards.append(reward)
print('Iter %d, Steps %d, Reward: %s' % (i_episode, i_step, reward))
In [5]:
env.render(close=True)