Much of modern machine is reducing learning to numerical optimization, in supervised learning this is about minimizing training error as defined by our loss function and
Learning is about making some kind of prediction on data you haven't seen before.
In reinforcement learning we are trying to teach an agent to make optimal decisions given some environment.
In reinforcement learning problems we define our interactions with:
Go to OpenAI environments to see what environments exist to test algorithms. Each environment is defined by a very small API.
env = gym.make('envName)
env.reset()
will start the environment and return the initial observations env.action_space()
and env.observation_space()
describe the set of valid actions and observations. step()
to take your chosen action and return the new state of the environment.The environment's .step() function returns an:
import gym
import random
env = gym.make('CartPole-v0')
action = 1
state = env.reset()
print(env.action_space)
print(env.observation_space)
new_state, reward, done, _ = env.step(action)
We can render and save a video of the environment ot see what the agent is doing.
import gym
from gym.wrappers import Monitor
env = gym.make('CartPole-v0')
env = Monitor(env, directory= '/tmp/cartpole-v0/', force=True)
env.render()
env.close()
Monitor.close(env)
Note: For my desktop environment, the env window cannot be closed and I have to use xkill
in the terminal
In [ ]:
#random search
import gym
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
env = gym.make('CartPole-v0')
episodes = []
def run_episode(env, parameters):
observation = env.reset()
totalreward = 0
for t in range(200):
env.render()
action = 0 if np.matmul(parameters, observation) < 0 else 1
observation, reward, done, info = env.step(action)
totalreward += reward
if done:
#print("After {} timesteps, reward: {}".format(t+1, totalreward))
break
return totalreward
def random_search(iterations):
bestparams = None
bestreward = 0
for i in range(iterations):
parameters = np.random.rand(4) * 2 - 1
reward = run_episode(env,parameters)
if reward > bestreward:
bestreward = reward
bestparams = parameters
# considered solved if the agent lasts 200 timesteps
if reward == 200:
print(bestparams)
episodes.append(i+1)
break
for _ in range(10):
random_search(100)
# print(episodes)
length = np.arange(len(episodes))
plt.title('Episodes for each trial')
plt.xlabel('Trial')
plt.ylabel('Episodes to train')
plt.bar(length, episodes)
plt.xticks(length)
plt.show()
plt.title('Histogram of episodes')
plt.xlabel('Episodes to train')
plt.ylabel('Frequency')
plt.hist(episodes, normed=True, bins=20)
plt.show()
How we define our policy is the crux of reinforcemnt learning.
$Q(s, a) = r + \gamma(max(Q(s',a'))$
The Bellman equation states that the long-term expected reward for a given action is equal to the immediate reward from the current action combined with the expected reward from the best future action taken at the following state.
In [8]:
import gym
import numpy as np
env = gym.make('FrozenLake-v0')
Q = np.zeros([env.observation_space.n, env.action_space.n])
lr = .8
y = .95
num_episodes = 2000
reward_list = []
for i in range(num_episodes):
state = env.reset()
reward_all = 0
done = False
j = 0
while j < 99:
j+= 1
action = np.argmax(Q[state,:] + np.random.randn(1,env.action_space.n)*(1./(i+1)))
new_state, reward, done, _ = env.step(action)
Q[state, action] = Q[state, action] + lr*(reward + y*np.max(Q[new_state,:]) - Q[state, action])
reward_all += reward
state = new_state
if done == True:
break
reward_list.append(reward_all)
print("Score over time: {}".format(sum(reward_list)/num_episodes))
print("Final Q-Table Values")
print(Q)
In [ ]:
import gym
import numpy as np
import random
import ipdb
import matplotlib.pyplot as plt
import tensorflow as tf
%matplotlib inline
env = gym.make('FrozenLake-v0')
observations = tf.placeholder(shape=[1, env.observation_space.n], dtype=tf.float32)
weights = tf.Variable(tf.random_uniform([16,4], 0, 0.01))
Qout = tf.matmul(observations, weights)
predict = tf.argmax(Qout, 1)
nextQ = tf.placeholder(shape=[1, 4], dtype=tf.float32)
loss = tf.reduce_sum(tf.square(nextQ - Qout))
trainer = tf.train.GradientDescentOptimizer(learning_rate=0.1)
update_model = trainer.minimize(loss)
init = tf.global_variables_initializer()
gamma = .99
epsilon = 0.1
num_episodes = 2000
j_list = []
r_list = []
with tf.Session() as sess:
sess.run(init)
for i in range(num_episodes):
s = env.reset()
reward_all = 0
done = False
j = 0
while j < 99:
j+= 1
a, allQ = sess.run([predict, Qout], feed_dict={observations:np.identity(16)[s:s+1]})
if np.random.rand(1) < epsilon:
a[0] = env.action_space.sample()
s1, reward, done, _ = env.step(a[0])
Q1 = sess.run(Qout, feed_dict={observations: np.identity(16)[s1:s1+1]})
maxQ1 = np.max(Q1)
targetQ = allQ
targetQ[0, a[0]] = reward + gamma*maxQ1
_, W1 = sess.run([update_model, weights], feed_dict={observations: np.identity(16)[s:s+1], nextQ:targetQ})
reward_all += reward
s = s1
if done == True:
epsilon = 1./((i/50) + 10)
break
j_list.append(j)
r_list.append(reward_all)
print("Percent of succesful episodes: {} %".format((sum(r_list)/num_episodes)))
In [ ]:
plt.plot(r_list)
In [ ]:
plt.plot(j_list)