Lecture 4: Model-free prediction

  • estimate the value function of an unknown MDP

Monte-Carlo Reinforcement Learning

  • MC methods learn directly from episodes of experience
  • MC is model-free: no knowledge of MDP transitions/rewards
  • MC learns from complete episodes, no bootstrapping
  • MC uses the simplest possible idea: value = mean return
  • caveat: can only apply MC to episodic MDPs:
    • all episodes must terminate

Monte-Carlo policy evaluation

  • Goal: learn $v_\pi$ from episodes of experience under policy $\pi$ $$ S_1, A_1, R_2, \dots, S_k \sim \pi $$
  • recall that the return is the total discounted reward $$ G_t = R_{t+1} + \gamma R_{t+2} + \dots + \gamma^{T-1} R_t $$
  • recall that the value function is the expected return: $$ v_\pi(s) = \mathbb{E}_\pi[G_t \mid S_t =s] $$
  • Monte-Carlo policy evaluation uses empirical mean return instead of expected return

First-visit Monte-Carlo Policy Evaluation

  • to evaluate state $s$
  • the first time-step $t$ that state $s$ is visited in an episode:
    • increment counter $N(s) \leftarrow N(s) + 1$
    • increment total return $S(s) \leftarrow S(s) + G_t$
  • value is estimated by mean return $V(s) = S(s) / N(s)$
  • by law of large numbers, $V(s) \rightarrow v_\pi(s)$ as $N(s) \rightarrow \infty$

Every-visit Monte-Carlo policy evaluation

  • to evaluate state $s$:
  • every time-step $t$ that state $s$ is visited in an episode:
    • increment counter $N(s) \leftarrow N(s) + 1$
    • increment total return $S(s) \leftarrow S(s) + G_t$
  • value is estimated by mean return $V(s) = S(s) / N(s)$
  • again, $V(s) \rightarrow v_\pi(s)$ as $N(s) \rightarrow \infty$

In [62]:
import torch
import numpy as np
import math
import random
import matplotlib.pyplot as plt


N = torch.zeros(10, 10, 2)
S = torch.zeros(10, 10, 2)

def create_cards():
    cards = []
    for v in range(13):
        for i in range(4):
            cards.append(v + 1)
#     print('cards', cards)
    return cards

def calc_sum_cards(cards):
    sum = 0
    num_aces = 0
    for card in cards:
        if card > 10:
            card = 10
        if card == 1:
            num_aces += 1
            card = 11
        sum += card
    while sum > 21 and num_aces > 0:
        sum -= 10
        num_aces -= 1
    return sum, num_aces

def cards_to_state(dealer_card, us):
    sum, num_aces = calc_sum_cards(us)
    s = torch.IntTensor([sum - 12, dealer_card - 1, 0])
    if num_aces > 0:
        s[2] = 1
    return s, sum > 21

def apply_action(cards, s, a):
    our_sum = s[0] + 12
#     dealer_sum = s[1] + 1
    if s[1] == 0:
        dealer_cards = [11]
        dealer_aces = 1
    else:
        dealer_cards = [s[1] + 1]
        dealer_aces = 0
    if a == 0:  # stick
        num_aces = 0
        while calc_sum_cards(dealer_cards)[0] < 16:
            dealer_cards.append(cards[-1])
        dealer_sum = calc_sum_cards(dealer_cards)[0]
        if dealer_sum > 21:
            return cards, _, 1, True
        elif dealer_sum == our_sum:
            return cards, _, 0, True
        else:
            return cards, _, -1, True
    else:  # twist
        card = cards[-1]
        cards = cards[:-1]
        useable_ace = s[2] == 1
        if card > 10:
            card = 10
        if card == 1:
            useable_ace = True
            card = 11
        our_sum += card
        if our_sum > 21 and useable_ace:
            our_sum -= 10
            useable_ace = False
        if our_sum > 21:
#             print('bust')
            return cards, _, -1, True
        else:
            s_new = s.clone()
            s_new[0] = our_sum - 12
            s_new[2] = 1 if useable_ace else 0
            return cards, s_new, 0, False
#     return cards, s_new, reward

episode = 0
# while True:
while episode < 100:
#     print('e %s' % episode)
    cards = create_cards()
    random.shuffle(cards)
#     print('cards', cards)
    dealer_card = cards[-1]
    if dealer_card > 10:
        dealer_card = 10
    cards = cards[:-1]
    our_cards = []
    while calc_sum_cards(our_cards)[0] < 12:
        cards = cards[:-1]
        our_cards.append(cards[-1])
    s, done = cards_to_state(dealer_card, our_cards)
    states = []
    rewards = []
    while True:
#         N[s[0], s[1], s[2]] += 1
#         S[s[0], s[1], s[2]] += 
#         print('s', s)
        states.append(s.clone())
        a = np.random.randint(2)
#         print('a', a)
        cards, s_new, reward, done = apply_action(cards, s, a)
#         print('s_new', s_new)
        rewards.append(reward)
        if done:
            break
    G = 0
#     print('len(states)', len(states))
    for i in range(len(states) - 1, -1, -1):
        s = states[i]
        r = rewards[i]
        G += r
#         print('i', i, 's', s, 'r', r, 'G', G)
        N[s[0], s[1], s[2]] += 1
        S[s[0], s[1], s[2]] += G
#     print('G', G)
#     print('N', N)
#     print('S', S)
    episode += 1

# plt.imshow()


G -1
G -1
G -1
G -1
G -1
G -1
G 0
G 0
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G 0
G -1
G 1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G 1
G -1
G -1
G -1
G -1
G -1
G 1
G -1
G -1
G 1
G 0
G -1
G 0
G -1
G -1
G -1
G -1
G -1
G 0
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G 1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G 1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G -1
G 1
G -1
G -1
G -1
G 1
G -1
G -1
G 1
G -1
G -1
G -1
G -1
G -1
G 1
G -1

up to 55:10 approx


In [2]:
import matplotlib.pyplot as plt
import torch
import numpy as np

class Env(object):
    def __init__(self):
        self.env_size = 7

    @property
    def num_actions(self):
        return 2

    @property
    def state_shape(self):
        return [self.env_size]
    
    def reset(self):
        self.s = torch.IntTensor(1)
        self.s[0] = np.random.randint(5) + 1
        return self.s
        
    def act(self, a):
        if a == 0:
            self.s[0] -= 1
        else:
            self.s[0] += 1
        reward = 1 if self.s[0] == self.env_size - 1 else 0
        done = self.s[0] in [0, self.env_size - 1]
        return self.s, reward, done
    
    def render(self):
        res = ['-'] * self.env_size
        res[0] = '*'
        res[6] = '*'
        res[self.s[0]] = 'X'
        print(''.join(res))

class RandomPolicy(object):
    def __init__(self, num_actions):
        self.num_actions = num_actions
    
    def get_action(self, s):
        return np.random.randint(self.num_actions)

def run_episode(env, policy, render=True):
    actions = []
    rewards = []
    states = []
    s = env.reset()
    while True:
        a = policy.get_action(s)
        if render:
            env.render()
        states.append(s.clone())
        s, r, done = env.act(a)
        if render:
            print('a %s r %s' % (a, r))
        rewards.append(r)
        actions.append(a)
        if done:
            break
#     if render:
#         env.render()
    return states, actions, rewards

env = Env()
policy = RandomPolicy(env.num_actions)
print(env.state_shape)

num_episodes = 5
num_episodes = 0
for episode in range(num_episodes):
    print('')
    print('episode %s' % episode)
    states, actions, rewards = run_episode(env, policy)

def get_linear_index(target, index):
    assert len(index.shape) == 1
    linear_index = 0
    D = index.shape[0]
    for d in range(D):
        linear_index = linear_index * target.shape[d] + index[d]
    return linear_index

def tensor_set(target, index, v):
    linear_index = get_linear_index(target, index)
    target.view(-1)[linear_index] = v

def tensor_get(target, index):
    linear_index = get_linear_index(target, index)
    target = target.view(-1)[linear_index]
    return target

def tensor_inc(target, index, v):
    tensor_set(target, index, tensor_get(target, index) + v)


class MonteCarloAllVisits(object):
    def __init__(self, env, policy):
        self.env = env
        self.policy = policy
        self.state_shape = env.state_shape
        self.N = torch.zeros(self.state_shape)
        self.S = torch.zeros(self.state_shape)

    def step(self):
        states, actions, rewards = run_episode(self.env, self.policy, render=False)
        G = 0
        for i in range(len(actions) - 1, -1, -1):
            s = states[i]
            a = actions[i]
            r = rewards[i]
            G += r
            tensor_inc(self.N, s, 1)
            tensor_inc(self.S, s, G)

    @property
    def V(self):
        return self.S / self.N

runner = MonteCarloAllVisits(env, policy)
for it in range(100):
    runner.step()
    if it % 20 == 0:
        print('it', it)
        print('V', runner.V)
print('V', runner.V)


[7]
it 0
V 
nan
nan
  1
  1
  1
  1
nan
[torch.FloatTensor of size 7]

it 20
V 
    nan
 0.3636
 0.5102
 0.5882
 0.6500
 0.7600
    nan
[torch.FloatTensor of size 7]

it 40
V 
    nan
 0.2093
 0.4000
 0.5368
 0.6386
 0.7692
    nan
[torch.FloatTensor of size 7]

it 60
V 
    nan
 0.1333
 0.2818
 0.4655
 0.6082
 0.7551
    nan
[torch.FloatTensor of size 7]

it 80
V 
    nan
 0.1522
 0.3233
 0.5067
 0.6439
 0.8116
    nan
[torch.FloatTensor of size 7]

V 
    nan
 0.1261
 0.2542
 0.4247
 0.6026
 0.7901
    nan
[torch.FloatTensor of size 7]


In [130]:
class MonteCarloFirstVisit(object):
    def __init__(self, env, policy):
        self.env = env
        self.policy = policy
        self.state_shape = env.state_shape
        self.N = torch.zeros(self.state_shape)
        self.S = torch.zeros(self.state_shape)
#         print('self.N', self.N)

    def step(self):
        states, actions, rewards = run_episode(self.env, self.policy, render=False)
        G = 0
#         seen_states = set()
        seen_states = torch.zeros(self.state_shape)
        seen_states.zero_()
        for i in range(len(actions) - 1, -1, -1):
            s = states[i]
            a = actions[i]
            r = rewards[i]
            G += r
#             print('seen_states', seen_states)
#             print('s', s)
            if tensor_get(seen_states, s) == 0:
                tensor_inc(self.N, s, 1)
                tensor_inc(self.S, s, G)
                tensor_set(seen_states, s, 1)

    @property
    def V(self):
        return self.S / self.N

runner = MonteCarloFirstVisit(env, policy)
for it in range(100):
    runner.step()
    if it % 20 == 0:
        print('it', it)
        print('V', runner.V)
print('V', runner.V)


it 0
V 
nan
nan
  1
  1
  1
  1
nan
[torch.FloatTensor of size 7]

it 20
V 
    nan
 0.0769
 0.3333
 0.5000
 0.5714
 0.9000
    nan
[torch.FloatTensor of size 7]

it 40
V 
    nan
 0.0435
 0.2917
 0.4545
 0.6000
 0.8261
    nan
[torch.FloatTensor of size 7]

it 60
V 
    nan
 0.1389
 0.3514
 0.4706
 0.6176
 0.8108
    nan
[torch.FloatTensor of size 7]

it 80
V 
    nan
 0.1064
 0.3191
 0.4545
 0.6341
 0.8478
    nan
[torch.FloatTensor of size 7]

V 
    nan
 0.1053
 0.3051
 0.4643
 0.6471
 0.8750
    nan
[torch.FloatTensor of size 7]


In [184]:
class TD(object):
    def __init__(self, env, policy, alpha=0.1):
        self.env = env
        self.policy = policy
        self.state_shape = env.state_shape
        self._V = torch.zeros(self.state_shape)
        self.alpha = alpha

    def step(self, render=False):
        states, actions, rewards = run_episode(self.env, self.policy, render=render)
        is_terminal = True
        s_next = None
        for t in range(len(actions) - 1, -1, -1):
#             print('t', t)
            s = states[t]
            a = actions[t]
            r = rewards[t]
            v_old = tensor_get(self._V, s)
            v_next = 0
            if s_next is not None:
                v_next = tensor_get(self.V, s_next)
            v_new = v_old + self.alpha * (r + v_next - v_old)
#             if s_next is not None:
#                 print('s', s[0], 'a', a, 'r', r, 's_next', s_next[0], 'v_new', v_new)
#             else:
#                 print('s', s[0], 'a', a, 'r', r, 'v_new', v_new)
            tensor_set(self._V, s, v_new)
            is_terminal = False
            s_next = s

    @property
    def V(self):
        return self._V

runner = TD(env, policy)
for it in range(100):
#     if random.randint(0, 2) == 0:
    runner.step()
    if it % 10 == 0:
        print('it', it)
        print('V', runner.V.view(1, -1))
print('V', runner.V.view(1, -1))


it 0
V 
    0     0     0     0     0     0     0
[torch.FloatTensor of size 1x7]

it 10
V 
 0.0000  0.0000  0.0029  0.0195  0.0573  0.4035  0.0000
[torch.FloatTensor of size 1x7]

it 20
V 
 0.0000  0.0037  0.0301  0.1029  0.2466  0.5270  0.0000
[torch.FloatTensor of size 1x7]

it 30
V 
 0.0000  0.0301  0.0883  0.1931  0.3675  0.6616  0.0000
[torch.FloatTensor of size 1x7]

it 40
V 
 0.0000  0.0277  0.0769  0.2083  0.3080  0.6011  0.0000
[torch.FloatTensor of size 1x7]

it 50
V 
 0.0000  0.0512  0.1254  0.2411  0.3831  0.6242  0.0000
[torch.FloatTensor of size 1x7]

it 60
V 
 0.0000  0.0809  0.1491  0.3232  0.5130  0.6776  0.0000
[torch.FloatTensor of size 1x7]

it 70
V 
 0.0000  0.0914  0.2127  0.2844  0.4302  0.6569  0.0000
[torch.FloatTensor of size 1x7]

it 80
V 
 0.0000  0.0991  0.2589  0.4089  0.5019  0.7534  0.0000
[torch.FloatTensor of size 1x7]

it 90
V 
 0.0000  0.1230  0.2442  0.3546  0.5294  0.7736  0.0000
[torch.FloatTensor of size 1x7]

V 
 0.0000  0.1540  0.2136  0.3375  0.5548  0.8322  0.0000
[torch.FloatTensor of size 1x7]

MC and TD

  • Goal: learn $v_\pi$ online from experience under policy $\pi$
  • incremental every-visit Monte-Carlo:
    • update value $V(S_t)$ toward actual return $G_t$ $$ V(S_t) \leftarrow V(S_t) + \alpha(G_t - V(S_t)) $$
  • simplest temporal-difference learning algorithm TD(0):
    • update value $V(S_t)$ toward estimated return $R_{t+1} + \gamma V(S_{t+1})$ $$ V(S_t) \leftarrow V(S_t) + \alpha(R_{t+1} + \gamma V(S_{t+1}) - V(S_t)) $$
    • $R_{t+1} + \gamma V(S_{t+1})$ is called the TD target
    • $\delta_t = R_{t+1} + \gamma V(S_{t+1}) - V(S_t)$ is called the TD error

In [155]:
runner = MonteCarloAllVisits(env, policy)
for it in range(10000):
    runner.step()
print('MCAV V', runner.V.view(1, -1))

runner = MonteCarloFirstVisit(env, policy)
for it in range(10000):
    runner.step()
print('MCFV V', runner.V.view(1, -1))

runner = TD(env, policy)
for it in range(10000):
    runner.step()
print('TD V', runner.V.view(1, -1))


MCAV V 
    nan  0.1758  0.3437  0.5091  0.6725  0.8376     nan
[torch.FloatTensor of size 1x7]

MCFV V 
    nan  0.1640  0.3292  0.4962  0.6622  0.8386     nan
[torch.FloatTensor of size 1x7]

TD V 
 0.0000  0.1776  0.2607  0.3970  0.6658  0.7973  0.0000
[torch.FloatTensor of size 1x7]


In [156]:
alpha = 0.1
runner = TD(env, policy, alpha=alpha)
N = 100000
for it in range(N):
    if it % (N // 10) == 0:
        print('TD V it %s' % it, runner.V.view(1, -1))
    runner.step()
print('TD V', runner.V.view(1, -1))


TD V it 0 
    0     0     0     0     0     0     0
[torch.FloatTensor of size 1x7]

TD V it 10000 
 0.0000  0.1052  0.2769  0.3896  0.5638  0.8184  0.0000
[torch.FloatTensor of size 1x7]

TD V it 20000 
 0.0000  0.1944  0.3405  0.4428  0.5752  0.8185  0.0000
[torch.FloatTensor of size 1x7]

TD V it 30000 
 0.0000  0.1234  0.3096  0.5227  0.6978  0.7968  0.0000
[torch.FloatTensor of size 1x7]

TD V it 40000 
 0.0000  0.1044  0.2875  0.5020  0.7644  0.8913  0.0000
[torch.FloatTensor of size 1x7]

TD V it 50000 
 0.0000  0.1372  0.3139  0.4940  0.6016  0.7570  0.0000
[torch.FloatTensor of size 1x7]

TD V it 60000 
 0.0000  0.1267  0.3496  0.5076  0.6906  0.8221  0.0000
[torch.FloatTensor of size 1x7]

TD V it 70000 
 0.0000  0.1593  0.3139  0.4863  0.7222  0.7864  0.0000
[torch.FloatTensor of size 1x7]

TD V it 80000 
 0.0000  0.1585  0.3963  0.5546  0.7426  0.8313  0.0000
[torch.FloatTensor of size 1x7]

TD V it 90000 
 0.0000  0.0946  0.2947  0.4559  0.6787  0.8369  0.0000
[torch.FloatTensor of size 1x7]

TD V 
 0.0000  0.1029  0.2977  0.4515  0.6418  0.7740  0.0000
[torch.FloatTensor of size 1x7]


In [169]:
# print('V_sum', V_sum)

for alpha in [0.1, 0.05, 0.02, 0.01, 0.001, 0.0001]:
#     alpha = 0.1
    V_sum = torch.zeros(env.state_shape)
    Vsquared_sum = torch.zeros(env.state_shape)
    count = 0
    runner = TD(env, policy, alpha=alpha)
    N = 10000
    for it in range(N):
    #     if it % (N // 10) == 0:
    #         print('TD V it %s' % it, runner.V.view(1, -1))
        runner.step()
        if N > N / 1000:
            V_sum += runner.V
            Vsquared_sum += runner.V * runner.V
            count += 1
    # print('V_sum', V_sum)
    # print('Vsquared_sum', Vsquared_sum)
    # variance is mean of (x - mu)^2
    # = E[(x - mu)^2]
    # = E[x^2 - 2x mu + mu^2]
    # = E[x^2] - 2E[x]^2 + E[x]^2
    #  = mean of x^2 - 2x mu + mu^2
    # = mean of x^2 - mu^2
    variance = Vsquared_sum / count - V_sum * V_sum / count / count
    # print('variance', variance.view(1, -1))
    std = variance.sqrt()
    print('alpha %s' % alpha, 'std', std.view(1, -1))
    print('TD V', runner.V.view(1, -1))


alpha 0.1 std 
1.00000e-02 *
  0.0000  5.2341  6.8739  7.6727  7.2182  5.6797  0.0000
[torch.FloatTensor of size 1x7]

TD V 
 0.0000  0.1962  0.3097  0.4281  0.5776  0.7676  0.0000
[torch.FloatTensor of size 1x7]

alpha 0.05 std 
1.00000e-02 *
  0.0000  3.6696  5.0679  6.0162  6.2872  5.6891  0.0000
[torch.FloatTensor of size 1x7]

TD V 
 0.0000  0.1691  0.3729  0.4791  0.5851  0.7925  0.0000
[torch.FloatTensor of size 1x7]

alpha 0.02 std 
1.00000e-02 *
  0.0000  3.3570  5.6238  7.2149  7.9160  6.8365  0.0000
[torch.FloatTensor of size 1x7]

TD V 
 0.0000  0.1301  0.3007  0.4490  0.6213  0.8123  0.0000
[torch.FloatTensor of size 1x7]

alpha 0.01 std 
 0.0000  0.0403  0.0725  0.0948  0.1024  0.0880  0.0000
[torch.FloatTensor of size 1x7]

TD V 
 0.0000  0.1461  0.3294  0.5035  0.6783  0.8442  0.0000
[torch.FloatTensor of size 1x7]

alpha 0.001 std 
 0.0000  0.0416  0.0877  0.1355  0.1743  0.1808  0.0000
[torch.FloatTensor of size 1x7]

TD V 
 0.0000  0.1259  0.2692  0.4244  0.5897  0.7857  0.0000
[torch.FloatTensor of size 1x7]

alpha 0.0001 std 
1.00000e-02 *
  0.0000  0.0109  0.1104  0.6690  2.8985  9.3392  0.0000
[torch.FloatTensor of size 1x7]

TD V 
 0.0000  0.0004  0.0040  0.0228  0.0940  0.3286  0.0000
[torch.FloatTensor of size 1x7]


In [208]:
class ABEnv(object):
    def __init__(self):
        self.episode = 0

    @property
    def num_actions(self):
        return 1

    @property
    def state_shape(self):
        return [2]
    
    def reset_all(self):
        self.episode = -1
    
    def reset(self):
#         print('reset episode=%s' % self.episode)
        self.episode += 1
        self.s = torch.IntTensor(1)
        if self.episode > 0:
            self.s[0] = 1
        else:
            self.s[0] = 0
        return self.s
        
    def act(self, a):
        if self.s[0] == 0:
            self.s[0] = 1
            return self.s, 0, False
        else:
            if self.episode in [0, 7]:
                return None, 0, True
            else:
                return None, 1, True
    
    def render(self):
        res = ['.', '.']
        res[self.s[0]] = 'X'
        print(res)
    
N = 1000
ab_env = ABEnv()
policy = RandomPolicy(num_actions=ab_env.num_actions)
render = False

runner = TD(ab_env, policy, alpha=0.1)
for it in range(N):
    ab_env.reset_all()
    for episode in range(8):
        if render:
            print('it', it)
        runner.step(render=render)
    if it > 0 and it % (N // 2) == 0:
        print('TD V it %s' % it, runner.V.view(1, -1))
print('TD V', runner.V.view(1, -1))

runner = MonteCarloAllVisits(ab_env, policy)
for it in range(N):
    ab_env.reset_all()
    for episode in range(8):
#         print('it', it)
        runner.step()
    if it > 0 and it % (N // 2) == 0:
        print('MCAV V it %s' % it, runner.V.view(1, -1))
print('MCAV V', runner.V.view(1, -1))

runner = MonteCarloFirstVisit(ab_env, policy)
for it in range(N):
    ab_env.reset_all()
    for episode in range(8):
#         print('it', it)
        runner.step()
    if it > 0 and it % (N // 2) == 0:
        print('MCFV V it %s' % it, runner.V.view(1, -1))
print('MCFV V', runner.V.view(1, -1))


TD V it 500 
 0.6664  0.7404
[torch.FloatTensor of size 1x2]

TD V 
 0.6664  0.7404
[torch.FloatTensor of size 1x2]

MCAV V it 500 
 0.0000  0.7500
[torch.FloatTensor of size 1x2]

MCAV V 
 0.0000  0.7500
[torch.FloatTensor of size 1x2]

MCFV V it 500 
 0.0000  0.7500
[torch.FloatTensor of size 1x2]

MCFV V 
 0.0000  0.7500
[torch.FloatTensor of size 1x2]

MC Convergence

MC converges to solution with minimum MSE (why?):

$$\def\argmin{\text{argmin}} \argmin_{V} \mathcal{L} = \sum_{k=1}^K \sum_{t=1}^{T_k} (g_t^k - V(s_t^k))^2 $$

For each example, using $\mathbf{V} = \{0, 0.75 \}$:

$\tau$ $\mathbf{g}$ MSE
A,0,B,0 0,0 0^2 + 0.75^2 = 0.5625
B, 1 1 0.25 ^ 2 = 0.0625
... ... ...
B, 0 0 0.75^2 = 0.5625
  • changing value for $A$ will increase the MSE for the first example, and wont change the MSE for any other examples
  • changing value for $B$ will increase the MSE for some examples, decrease for others, and with the end results that overall MSE increases

TD Convergence

TD(0) converges to solution of max likelihood Markov model:

$$ \mathcal{P}_{ss'}^a = \frac{1}{N(s,a)} \sum_{k=1}^K \sum_{t=1}^{T_k} \mathbf{1}(s_t^k, a_t^k, s_{t+1}^k = s, a, s') \\ \mathcal{R}_s^a = \frac{1}{N(s,a)} \sum_{k=1}^K \sum_{t=1}^{T_k} \mathbf{1}(s_t^k, a_t^k = s, a)r_t^k $$

For each example, using $\mathbf{V} = \{0.75, 0.75\}$:

$\tau$ (s,r,s') tuples
A,0,B,0 (A,0,B) (B,0,*)
B,1 (B,1,*)
B,0 (B,0,*)

transition probabilities

s, s' N(s) N(s,s') P
A, B 1 1 1.0
B, R0 8 2 0.25
B, R1 8 6 0.75

rewards

s N(s) sum rewards R
A 1 0 0
B 8 6 0.75

Bootstrapping and sampling

bootstrapping: update involves an estimate

  • MC does not bootstrap
  • TD bootstraps
  • DP bootstraps

sampling: update samples an expectation

  • MC samples
  • TD samples
  • DP doesnt sample

unified view of RL

shallow backups deep backups
full backups DP Exhaustive search
sample backups TD MC

up to 1:15:59

n-Step Prediction

  • let TD target look $n$ steps into the future

n-Step Return

  • consider the following n-step returns for $n=1,2,\infty$:
$$ \begin{align} n = 1 & \hspace{24px} & (TD) & \hspace{24px} & G_t^{(1)} = R_{t+1} + \gamma V(S_{t+1}) \\ n = 2 & & & & G_t^{(2)} + \gamma R_{t+1} + R_{t+2} + \gamma^2 V(S_t + 2) \\ \vdots & & & & \vdots \\ n = \infty & & (MC) & & G_t^{(\infty)} = R_{t+1} + \gamma R_{t+2} + \dots + \gamma^{T-1} R_T \end{align} $$
  • define the n-step return:
$$ G_t^{n} = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+2} + \dots + \gamma^{n-1}R_{t+n} + \gamma^n V(S_{t+n}) $$
  • n-step temporal-difference learning
$$ V(S_t) \leftarrow V(S_t) + \alpha(G_t^{(n)} - V(S_t)) $$

$\lambda$-return

  • the $\lambda$-return $G_t^\lambda$ combines all $n$-step returns $G_t^{(n)}$
  • using weight $(1-\lambda)\lambda^{n-1}$ $$ G_t^\lambda = (1-\lambda) \sum_{n=1}^\infty \lambda^{n-1} G_t^{(n)} $$
  • forward-view $TD(\lambda)$: $$ V(S_t) \leftarrow V(S_t) + \alpha(G_t^\lambda - V(S_t)) $$

TD($\lambda$) Weighting Function

$$ G_t^\lambda = (1 - \lambda) \sum_{n=1}^\infty \lambda^{n-1} G_t^{(n)} $$$$ \sum_{t=t}^T G_t^\lambda = 1 $$

Backward view TD($\lambda$)

  • forward view provides theory
  • backward view provides mechanism
  • update online, every step, from incomplete sequences

Eligibility Traces

$$ E_0(s) = 0 \\ E_t(s) = \gamma \lambda E_{t-1}(s) + \mathbf{1}(S_t = s) $$

Backward view TD($\lambda$)

  • keep an eligibility trace for every state $s$
  • update value $V(s)$ for every state $s$
  • in proportion to TD-error $\delta_t$ and eligibility trace $E_t(s)$:
$$ \delta_t = R_{t+1} + \gamma V(S_{t+1}) - V(S_t) \\ V(S_t) \leftarrow V(S_t) + \alpha \delta_t E_t(s) $$

In [16]:
class Runner(object):
    def __init__(self, env, policy, learner):
        self.env = env
        self.policy = policy
        self.learner = learner

    def run_episode(self, render=True):
        s = self.env.reset()
        while True:
            a = self.policy.get_action(s)
            if render:
                self.env.render()
            s_old = s.clone()
            s_new, r, done = env.act(a)
            if render:
                print('a %s r %s' % (a, r))
            learner.step(s_old, a, r, s_new, done, render=render)
            if done:
                break

class TD0_Online(object):
    def __init__(self, env, alpha=0.1):
        self.env = env
        self.state_shape = env.state_shape
        self._V = torch.zeros(self.state_shape)
        self.alpha = alpha

    @property
    def expects_online(self):
        return True

    def step(self, s, a, r, s_new, done, render=False):
        v_old = tensor_get(self._V, s)
        v_next = 0
        if s_new is not None:
            v_next = tensor_get(self.V, s_new)
        v_new = v_old + self.alpha * (r + v_next - v_old)
        tensor_set(self._V, s, v_new)

    @property
    def V(self):
        return self._V

learner = TD0_Online(env)
runner = Runner(env=env, policy=policy, learner=learner)
for it in range(500):
    render = it % 100 == 0
    runner.run_episode(render=False)
    if render:
        print('it', it)
        print('V', learner.V.view(1, -1))
print('V', learner.V.view(1, -1))


it 0
V 
 0.0000  0.0000  0.0000  0.0000  0.0000  0.1000  0.0000
[torch.FloatTensor of size 1x7]

it 100
V 
 0.0000  0.1065  0.2425  0.4207  0.6129  0.8431  0.0000
[torch.FloatTensor of size 1x7]

it 200
V 
 0.0000  0.0757  0.1881  0.4000  0.5574  0.8156  0.0000
[torch.FloatTensor of size 1x7]

it 300
V 
 0.0000  0.0806  0.3245  0.4481  0.7244  0.8945  0.0000
[torch.FloatTensor of size 1x7]

it 400
V 
 0.0000  0.1656  0.3054  0.4808  0.6269  0.7876  0.0000
[torch.FloatTensor of size 1x7]

V 
 0.0000  0.1588  0.3540  0.5242  0.6726  0.8711  0.0000
[torch.FloatTensor of size 1x7]


In [18]:
class TDLambdaBackward_Online(object):
    def __init__(self, env, gamma=1.0, alpha=0.1, lambda_=0.9):
        self.env = env
        self.state_shape = env.state_shape
        self._V = torch.zeros(self.state_shape)
        self.gamma = gamma
        self.alpha = alpha
        self.lambda_ = lambda_
        self.eligibility = torch.zeros(self.state_shape)

    @property
    def expects_online(self):
        return True

    def step(self, s, a, r, s_new, done, render=False):
        self.eligibility *= self.lambda_
        tensor_inc(self.eligibility, s, 1)

        v_old = tensor_get(self._V, s)
        v_next = 0
        if s_new is not None:
            v_next = tensor_get(self.V, s_new)
        td_error = r + self.gamma * v_next - v_old
        for i in range(self.state_shape[0]):
            if self.eligibility[i] > 1e-3:
                e_state = torch.IntTensor([i])
                v_old = tensor_get(self._V, e_state)
                v_new = v_old + self.alpha * td_error * self.eligibility[i]
                tensor_set(self._V, e_state, v_new)
                
#         v_new = v_old + self.alpha * (r + v_next - v_old)
#         tensor_set(self._V, s, v_new)

    @property
    def V(self):
        return self._V

learner = TDLambdaBackward_Online(env)
runner = Runner(env=env, policy=policy, learner=learner)
for it in range(50):
    render = it % 10 == 0
    runner.run_episode(render=False)
    if render:
        print('it', it)
        print('V', learner.V.view(1, -1))
print('V', learner.V.view(1, -1))


it 0
V 
    0     0     0     0     0     0     0
[torch.FloatTensor of size 1x7]

it 10
V 
 0.0000  0.1733  0.2779  0.4533  0.6238  0.5730  0.0000
[torch.FloatTensor of size 1x7]

it 20
V 
 0.0000  0.1490  0.1871  0.4287  0.7698  0.7910  0.0000
[torch.FloatTensor of size 1x7]

it 30
V 
 0.0000  0.2996  0.4641  0.6235  0.7451  0.9110  0.0000
[torch.FloatTensor of size 1x7]

it 40
V 
 0.0000  0.0099  0.0800  0.2421  0.5374  0.7289  0.0000
[torch.FloatTensor of size 1x7]

V 
 0.0000  0.3567  0.3392  0.5128  0.8077  0.8821  0.0000
[torch.FloatTensor of size 1x7]