Monte-Carlo Reinforcement Learning
Monte-Carlo policy evaluation
First-visit Monte-Carlo Policy Evaluation
Every-visit Monte-Carlo policy evaluation
In [62]:
import torch
import numpy as np
import math
import random
import matplotlib.pyplot as plt
N = torch.zeros(10, 10, 2)
S = torch.zeros(10, 10, 2)
def create_cards():
cards = []
for v in range(13):
for i in range(4):
cards.append(v + 1)
# print('cards', cards)
return cards
def calc_sum_cards(cards):
sum = 0
num_aces = 0
for card in cards:
if card > 10:
card = 10
if card == 1:
num_aces += 1
card = 11
sum += card
while sum > 21 and num_aces > 0:
sum -= 10
num_aces -= 1
return sum, num_aces
def cards_to_state(dealer_card, us):
sum, num_aces = calc_sum_cards(us)
s = torch.IntTensor([sum - 12, dealer_card - 1, 0])
if num_aces > 0:
s[2] = 1
return s, sum > 21
def apply_action(cards, s, a):
our_sum = s[0] + 12
# dealer_sum = s[1] + 1
if s[1] == 0:
dealer_cards = [11]
dealer_aces = 1
else:
dealer_cards = [s[1] + 1]
dealer_aces = 0
if a == 0: # stick
num_aces = 0
while calc_sum_cards(dealer_cards)[0] < 16:
dealer_cards.append(cards[-1])
dealer_sum = calc_sum_cards(dealer_cards)[0]
if dealer_sum > 21:
return cards, _, 1, True
elif dealer_sum == our_sum:
return cards, _, 0, True
else:
return cards, _, -1, True
else: # twist
card = cards[-1]
cards = cards[:-1]
useable_ace = s[2] == 1
if card > 10:
card = 10
if card == 1:
useable_ace = True
card = 11
our_sum += card
if our_sum > 21 and useable_ace:
our_sum -= 10
useable_ace = False
if our_sum > 21:
# print('bust')
return cards, _, -1, True
else:
s_new = s.clone()
s_new[0] = our_sum - 12
s_new[2] = 1 if useable_ace else 0
return cards, s_new, 0, False
# return cards, s_new, reward
episode = 0
# while True:
while episode < 100:
# print('e %s' % episode)
cards = create_cards()
random.shuffle(cards)
# print('cards', cards)
dealer_card = cards[-1]
if dealer_card > 10:
dealer_card = 10
cards = cards[:-1]
our_cards = []
while calc_sum_cards(our_cards)[0] < 12:
cards = cards[:-1]
our_cards.append(cards[-1])
s, done = cards_to_state(dealer_card, our_cards)
states = []
rewards = []
while True:
# N[s[0], s[1], s[2]] += 1
# S[s[0], s[1], s[2]] +=
# print('s', s)
states.append(s.clone())
a = np.random.randint(2)
# print('a', a)
cards, s_new, reward, done = apply_action(cards, s, a)
# print('s_new', s_new)
rewards.append(reward)
if done:
break
G = 0
# print('len(states)', len(states))
for i in range(len(states) - 1, -1, -1):
s = states[i]
r = rewards[i]
G += r
# print('i', i, 's', s, 'r', r, 'G', G)
N[s[0], s[1], s[2]] += 1
S[s[0], s[1], s[2]] += G
# print('G', G)
# print('N', N)
# print('S', S)
episode += 1
# plt.imshow()
up to 55:10 approx
In [2]:
import matplotlib.pyplot as plt
import torch
import numpy as np
class Env(object):
def __init__(self):
self.env_size = 7
@property
def num_actions(self):
return 2
@property
def state_shape(self):
return [self.env_size]
def reset(self):
self.s = torch.IntTensor(1)
self.s[0] = np.random.randint(5) + 1
return self.s
def act(self, a):
if a == 0:
self.s[0] -= 1
else:
self.s[0] += 1
reward = 1 if self.s[0] == self.env_size - 1 else 0
done = self.s[0] in [0, self.env_size - 1]
return self.s, reward, done
def render(self):
res = ['-'] * self.env_size
res[0] = '*'
res[6] = '*'
res[self.s[0]] = 'X'
print(''.join(res))
class RandomPolicy(object):
def __init__(self, num_actions):
self.num_actions = num_actions
def get_action(self, s):
return np.random.randint(self.num_actions)
def run_episode(env, policy, render=True):
actions = []
rewards = []
states = []
s = env.reset()
while True:
a = policy.get_action(s)
if render:
env.render()
states.append(s.clone())
s, r, done = env.act(a)
if render:
print('a %s r %s' % (a, r))
rewards.append(r)
actions.append(a)
if done:
break
# if render:
# env.render()
return states, actions, rewards
env = Env()
policy = RandomPolicy(env.num_actions)
print(env.state_shape)
num_episodes = 5
num_episodes = 0
for episode in range(num_episodes):
print('')
print('episode %s' % episode)
states, actions, rewards = run_episode(env, policy)
def get_linear_index(target, index):
assert len(index.shape) == 1
linear_index = 0
D = index.shape[0]
for d in range(D):
linear_index = linear_index * target.shape[d] + index[d]
return linear_index
def tensor_set(target, index, v):
linear_index = get_linear_index(target, index)
target.view(-1)[linear_index] = v
def tensor_get(target, index):
linear_index = get_linear_index(target, index)
target = target.view(-1)[linear_index]
return target
def tensor_inc(target, index, v):
tensor_set(target, index, tensor_get(target, index) + v)
class MonteCarloAllVisits(object):
def __init__(self, env, policy):
self.env = env
self.policy = policy
self.state_shape = env.state_shape
self.N = torch.zeros(self.state_shape)
self.S = torch.zeros(self.state_shape)
def step(self):
states, actions, rewards = run_episode(self.env, self.policy, render=False)
G = 0
for i in range(len(actions) - 1, -1, -1):
s = states[i]
a = actions[i]
r = rewards[i]
G += r
tensor_inc(self.N, s, 1)
tensor_inc(self.S, s, G)
@property
def V(self):
return self.S / self.N
runner = MonteCarloAllVisits(env, policy)
for it in range(100):
runner.step()
if it % 20 == 0:
print('it', it)
print('V', runner.V)
print('V', runner.V)
In [130]:
class MonteCarloFirstVisit(object):
def __init__(self, env, policy):
self.env = env
self.policy = policy
self.state_shape = env.state_shape
self.N = torch.zeros(self.state_shape)
self.S = torch.zeros(self.state_shape)
# print('self.N', self.N)
def step(self):
states, actions, rewards = run_episode(self.env, self.policy, render=False)
G = 0
# seen_states = set()
seen_states = torch.zeros(self.state_shape)
seen_states.zero_()
for i in range(len(actions) - 1, -1, -1):
s = states[i]
a = actions[i]
r = rewards[i]
G += r
# print('seen_states', seen_states)
# print('s', s)
if tensor_get(seen_states, s) == 0:
tensor_inc(self.N, s, 1)
tensor_inc(self.S, s, G)
tensor_set(seen_states, s, 1)
@property
def V(self):
return self.S / self.N
runner = MonteCarloFirstVisit(env, policy)
for it in range(100):
runner.step()
if it % 20 == 0:
print('it', it)
print('V', runner.V)
print('V', runner.V)
In [184]:
class TD(object):
def __init__(self, env, policy, alpha=0.1):
self.env = env
self.policy = policy
self.state_shape = env.state_shape
self._V = torch.zeros(self.state_shape)
self.alpha = alpha
def step(self, render=False):
states, actions, rewards = run_episode(self.env, self.policy, render=render)
is_terminal = True
s_next = None
for t in range(len(actions) - 1, -1, -1):
# print('t', t)
s = states[t]
a = actions[t]
r = rewards[t]
v_old = tensor_get(self._V, s)
v_next = 0
if s_next is not None:
v_next = tensor_get(self.V, s_next)
v_new = v_old + self.alpha * (r + v_next - v_old)
# if s_next is not None:
# print('s', s[0], 'a', a, 'r', r, 's_next', s_next[0], 'v_new', v_new)
# else:
# print('s', s[0], 'a', a, 'r', r, 'v_new', v_new)
tensor_set(self._V, s, v_new)
is_terminal = False
s_next = s
@property
def V(self):
return self._V
runner = TD(env, policy)
for it in range(100):
# if random.randint(0, 2) == 0:
runner.step()
if it % 10 == 0:
print('it', it)
print('V', runner.V.view(1, -1))
print('V', runner.V.view(1, -1))
MC and TD
In [155]:
runner = MonteCarloAllVisits(env, policy)
for it in range(10000):
runner.step()
print('MCAV V', runner.V.view(1, -1))
runner = MonteCarloFirstVisit(env, policy)
for it in range(10000):
runner.step()
print('MCFV V', runner.V.view(1, -1))
runner = TD(env, policy)
for it in range(10000):
runner.step()
print('TD V', runner.V.view(1, -1))
In [156]:
alpha = 0.1
runner = TD(env, policy, alpha=alpha)
N = 100000
for it in range(N):
if it % (N // 10) == 0:
print('TD V it %s' % it, runner.V.view(1, -1))
runner.step()
print('TD V', runner.V.view(1, -1))
In [169]:
# print('V_sum', V_sum)
for alpha in [0.1, 0.05, 0.02, 0.01, 0.001, 0.0001]:
# alpha = 0.1
V_sum = torch.zeros(env.state_shape)
Vsquared_sum = torch.zeros(env.state_shape)
count = 0
runner = TD(env, policy, alpha=alpha)
N = 10000
for it in range(N):
# if it % (N // 10) == 0:
# print('TD V it %s' % it, runner.V.view(1, -1))
runner.step()
if N > N / 1000:
V_sum += runner.V
Vsquared_sum += runner.V * runner.V
count += 1
# print('V_sum', V_sum)
# print('Vsquared_sum', Vsquared_sum)
# variance is mean of (x - mu)^2
# = E[(x - mu)^2]
# = E[x^2 - 2x mu + mu^2]
# = E[x^2] - 2E[x]^2 + E[x]^2
# = mean of x^2 - 2x mu + mu^2
# = mean of x^2 - mu^2
variance = Vsquared_sum / count - V_sum * V_sum / count / count
# print('variance', variance.view(1, -1))
std = variance.sqrt()
print('alpha %s' % alpha, 'std', std.view(1, -1))
print('TD V', runner.V.view(1, -1))
In [208]:
class ABEnv(object):
def __init__(self):
self.episode = 0
@property
def num_actions(self):
return 1
@property
def state_shape(self):
return [2]
def reset_all(self):
self.episode = -1
def reset(self):
# print('reset episode=%s' % self.episode)
self.episode += 1
self.s = torch.IntTensor(1)
if self.episode > 0:
self.s[0] = 1
else:
self.s[0] = 0
return self.s
def act(self, a):
if self.s[0] == 0:
self.s[0] = 1
return self.s, 0, False
else:
if self.episode in [0, 7]:
return None, 0, True
else:
return None, 1, True
def render(self):
res = ['.', '.']
res[self.s[0]] = 'X'
print(res)
N = 1000
ab_env = ABEnv()
policy = RandomPolicy(num_actions=ab_env.num_actions)
render = False
runner = TD(ab_env, policy, alpha=0.1)
for it in range(N):
ab_env.reset_all()
for episode in range(8):
if render:
print('it', it)
runner.step(render=render)
if it > 0 and it % (N // 2) == 0:
print('TD V it %s' % it, runner.V.view(1, -1))
print('TD V', runner.V.view(1, -1))
runner = MonteCarloAllVisits(ab_env, policy)
for it in range(N):
ab_env.reset_all()
for episode in range(8):
# print('it', it)
runner.step()
if it > 0 and it % (N // 2) == 0:
print('MCAV V it %s' % it, runner.V.view(1, -1))
print('MCAV V', runner.V.view(1, -1))
runner = MonteCarloFirstVisit(ab_env, policy)
for it in range(N):
ab_env.reset_all()
for episode in range(8):
# print('it', it)
runner.step()
if it > 0 and it % (N // 2) == 0:
print('MCFV V it %s' % it, runner.V.view(1, -1))
print('MCFV V', runner.V.view(1, -1))
MC Convergence
MC converges to solution with minimum MSE (why?):
$$\def\argmin{\text{argmin}} \argmin_{V} \mathcal{L} = \sum_{k=1}^K \sum_{t=1}^{T_k} (g_t^k - V(s_t^k))^2 $$For each example, using $\mathbf{V} = \{0, 0.75 \}$:
$\tau$ | $\mathbf{g}$ | MSE |
---|---|---|
A,0,B,0 | 0,0 | 0^2 + 0.75^2 = 0.5625 |
B, 1 | 1 | 0.25 ^ 2 = 0.0625 |
... | ... | ... |
B, 0 | 0 | 0.75^2 = 0.5625 |
TD Convergence
TD(0) converges to solution of max likelihood Markov model:
$$ \mathcal{P}_{ss'}^a = \frac{1}{N(s,a)} \sum_{k=1}^K \sum_{t=1}^{T_k} \mathbf{1}(s_t^k, a_t^k, s_{t+1}^k = s, a, s') \\ \mathcal{R}_s^a = \frac{1}{N(s,a)} \sum_{k=1}^K \sum_{t=1}^{T_k} \mathbf{1}(s_t^k, a_t^k = s, a)r_t^k $$For each example, using $\mathbf{V} = \{0.75, 0.75\}$:
$\tau$ | (s,r,s') tuples |
---|---|
A,0,B,0 | (A,0,B) (B,0,*) |
B,1 | (B,1,*) |
B,0 | (B,0,*) |
transition probabilities
s, s' | N(s) | N(s,s') | P |
---|---|---|---|
A, B | 1 | 1 | 1.0 |
B, R0 | 8 | 2 | 0.25 |
B, R1 | 8 | 6 | 0.75 |
rewards
s | N(s) | sum rewards | R |
---|---|---|---|
A | 1 | 0 | 0 |
B | 8 | 6 | 0.75 |
Bootstrapping and sampling
bootstrapping: update involves an estimate
sampling: update samples an expectation
unified view of RL
shallow backups | deep backups | |
---|---|---|
full backups | DP | Exhaustive search |
sample backups | TD | MC |
up to 1:15:59
n-Step Prediction
n-Step Return
$\lambda$-return
TD($\lambda$) Weighting Function
$$ G_t^\lambda = (1 - \lambda) \sum_{n=1}^\infty \lambda^{n-1} G_t^{(n)} $$$$ \sum_{t=t}^T G_t^\lambda = 1 $$Backward view TD($\lambda$)
Eligibility Traces
$$ E_0(s) = 0 \\ E_t(s) = \gamma \lambda E_{t-1}(s) + \mathbf{1}(S_t = s) $$Backward view TD($\lambda$)
In [16]:
class Runner(object):
def __init__(self, env, policy, learner):
self.env = env
self.policy = policy
self.learner = learner
def run_episode(self, render=True):
s = self.env.reset()
while True:
a = self.policy.get_action(s)
if render:
self.env.render()
s_old = s.clone()
s_new, r, done = env.act(a)
if render:
print('a %s r %s' % (a, r))
learner.step(s_old, a, r, s_new, done, render=render)
if done:
break
class TD0_Online(object):
def __init__(self, env, alpha=0.1):
self.env = env
self.state_shape = env.state_shape
self._V = torch.zeros(self.state_shape)
self.alpha = alpha
@property
def expects_online(self):
return True
def step(self, s, a, r, s_new, done, render=False):
v_old = tensor_get(self._V, s)
v_next = 0
if s_new is not None:
v_next = tensor_get(self.V, s_new)
v_new = v_old + self.alpha * (r + v_next - v_old)
tensor_set(self._V, s, v_new)
@property
def V(self):
return self._V
learner = TD0_Online(env)
runner = Runner(env=env, policy=policy, learner=learner)
for it in range(500):
render = it % 100 == 0
runner.run_episode(render=False)
if render:
print('it', it)
print('V', learner.V.view(1, -1))
print('V', learner.V.view(1, -1))
In [18]:
class TDLambdaBackward_Online(object):
def __init__(self, env, gamma=1.0, alpha=0.1, lambda_=0.9):
self.env = env
self.state_shape = env.state_shape
self._V = torch.zeros(self.state_shape)
self.gamma = gamma
self.alpha = alpha
self.lambda_ = lambda_
self.eligibility = torch.zeros(self.state_shape)
@property
def expects_online(self):
return True
def step(self, s, a, r, s_new, done, render=False):
self.eligibility *= self.lambda_
tensor_inc(self.eligibility, s, 1)
v_old = tensor_get(self._V, s)
v_next = 0
if s_new is not None:
v_next = tensor_get(self.V, s_new)
td_error = r + self.gamma * v_next - v_old
for i in range(self.state_shape[0]):
if self.eligibility[i] > 1e-3:
e_state = torch.IntTensor([i])
v_old = tensor_get(self._V, e_state)
v_new = v_old + self.alpha * td_error * self.eligibility[i]
tensor_set(self._V, e_state, v_new)
# v_new = v_old + self.alpha * (r + v_next - v_old)
# tensor_set(self._V, s, v_new)
@property
def V(self):
return self._V
learner = TDLambdaBackward_Online(env)
runner = Runner(env=env, policy=policy, learner=learner)
for it in range(50):
render = it % 10 == 0
runner.run_episode(render=False)
if render:
print('it', it)
print('V', learner.V.view(1, -1))
print('V', learner.V.view(1, -1))