This week's methods are all built to solve Markov Decision Processes. In the broadest sense, an MDP is defined by how it changes states and how rewards are computed.
State transition is defined by $P(s' |s,a)$ - how likely are you to end at state $s'$ if you take action $a$ from state $s$. Now there's more than one way to define rewards, but we'll use $r(s,a,s')$ function for convenience.
This notebook is inspired by the awesome CS294 by Berkeley
For starters, let's define a simple MDP from this picture:
In [ ]:
import sys, os
if 'google.colab' in sys.modules and not os.path.exists('.setup_complete'):
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/spring20/setup_colab.sh -O- | bash
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/spring20/week02_value_based/mdp.py
!touch .setup_complete
# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
!bash ../xvfb start
os.environ['DISPLAY'] = ':1'
In [ ]:
transition_probs = {
's0': {
'a0': {'s0': 0.5, 's2': 0.5},
'a1': {'s2': 1}
},
's1': {
'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
'a1': {'s1': 0.95, 's2': 0.05}
},
's2': {
'a0': {'s0': 0.4, 's2': 0.6},
'a1': {'s0': 0.3, 's1': 0.3, 's2': 0.4}
}
}
rewards = {
's1': {'a0': {'s0': +5}},
's2': {'a1': {'s0': -1}}
}
from mdp import MDP
mdp = MDP(transition_probs, rewards, initial_state='s0')
We can now use MDP just as any other gym environment:
In [ ]:
print('initial state =', mdp.reset())
next_state, reward, done, info = mdp.step('a1')
print('next_state = %s, reward = %s, done = %s' % (next_state, reward, done))
but it also has other methods that you'll need for Value Iteration
In [ ]:
print("mdp.get_all_states =", mdp.get_all_states())
print("mdp.get_possible_actions('s1') = ", mdp.get_possible_actions('s1'))
print("mdp.get_next_states('s1', 'a0') = ", mdp.get_next_states('s1', 'a0'))
print("mdp.get_reward('s1', 'a0', 's0') = ", mdp.get_reward('s1', 'a0', 's0'))
print("mdp.get_transition_prob('s1', 'a0', 's0') = ", mdp.get_transition_prob('s1', 'a0', 's0'))
You can also visualize any MDP with the drawing fuction donated by neer201.
You have to install graphviz for system and for python. For ubuntu just run:
sudo apt-get install graphviz
pip install graphviz
Note: Installing graphviz on some OS (esp. Windows) may be tricky. However, you can ignore this part alltogether and use the standart vizualization.
In [ ]:
from mdp import has_graphviz
from IPython.display import display
print("Graphviz available:", has_graphviz)
In [ ]:
if has_graphviz:
from mdp import plot_graph, plot_graph_with_state_values, plot_graph_optimal_strategy_and_state_values
display(plot_graph(mdp))
Now let's build something to solve this MDP. The simplest algorithm so far is Value Iteration
Here's the pseudo-code for VI:
1.
Initialize $V^{(0)}(s)=0$, for all $s$
2.
For $i=0, 1, 2, \dots$
3.
$ \quad V_{(i+1)}(s) = \max_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')]$, for all $s$
First, let's write a function to compute the state-action value function $Q^{\pi}$, defined as follows
$$Q_i(s, a) = \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')]$$
In [ ]:
%%writefile mdp_get_action_value.py
def get_action_value(mdp, state_values, state, action, gamma):
""" Computes Q(s,a) as in formula above """
<YOUR CODE>
return <YOUR CODE>
In [ ]:
from mdp_get_action_value import get_action_value
In [ ]:
import numpy as np
test_Vs = {s: i for i, s in enumerate(sorted(mdp.get_all_states()))}
assert np.isclose(get_action_value(mdp, test_Vs, 's2', 'a1', 0.9), 0.69)
assert np.isclose(get_action_value(mdp, test_Vs, 's1', 'a0', 0.9), 3.95)
Using $Q(s,a)$ we can now define the "next" V(s) for value iteration. $$V_{(i+1)}(s) = \max_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] = \max_a Q_i(s,a)$$
In [ ]:
def get_new_state_value(mdp, state_values, state, gamma):
""" Computes next V(s) as in formula above. Please do not change state_values in process. """
if mdp.is_terminal(state):
return 0
<YOUR CODE>
return <YOUR CODE>
In [ ]:
test_Vs_copy = dict(test_Vs)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's0', 0.9), 1.8)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's2', 0.9), 1.08)
assert np.isclose(get_new_state_value(mdp, {'s0': -1e10, 's1': 0, 's2': -2e10}, 's0', 0.9), -13500000000.0), \
"Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
assert test_Vs == test_Vs_copy, "Please do not change state_values in get_new_state_value"
Finally, let's combine everything we wrote into a working value iteration algo.
In [ ]:
# parameters
gamma = 0.9 # discount for MDP
num_iter = 100 # maximum iterations, excluding initialization
# stop VI if new values are this close to old values (or closer)
min_difference = 0.001
# initialize V(s)
state_values = {s: 0 for s in mdp.get_all_states()}
if has_graphviz:
display(plot_graph_with_state_values(mdp, state_values))
for i in range(num_iter):
# Compute new state values using the functions you defined above.
# It must be a dict {state : float V_new(state)}
new_state_values = <YOUR CODE>
assert isinstance(new_state_values, dict)
# Compute difference
diff = max(abs(new_state_values[s] - state_values[s])
for s in mdp.get_all_states())
print("iter %4i | diff: %6.5f | " % (i, diff), end="")
print(' '.join("V(%s) = %.3f" % (s, v) for s, v in state_values.items()))
state_values = new_state_values
if diff < min_difference:
print("Terminated")
break
In [ ]:
if has_graphviz:
display(plot_graph_with_state_values(mdp, state_values))
In [ ]:
print("Final state values:", state_values)
assert abs(state_values['s0'] - 3.781) < 0.01
assert abs(state_values['s1'] - 7.294) < 0.01
assert abs(state_values['s2'] - 4.202) < 0.01
Now let's use those $V^{*}(s)$ to find optimal actions in each state
$$\pi^*(s) = argmax_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] = argmax_a Q_i(s,a)$$
The only difference vs V(s) is that here we take not max but argmax: find action such with maximum Q(s,a).
In [ ]:
def get_optimal_action(mdp, state_values, state, gamma=0.9):
""" Finds optimal action using formula above. """
if mdp.is_terminal(state):
return None
<YOUR CODE>
return <YOUR CODE>
In [ ]:
assert get_optimal_action(mdp, state_values, 's0', gamma) == 'a1'
assert get_optimal_action(mdp, state_values, 's1', gamma) == 'a0'
assert get_optimal_action(mdp, state_values, 's2', gamma) == 'a1'
assert get_optimal_action(mdp, {'s0': -1e10, 's1': 0, 's2': -2e10}, 's0', 0.9) == 'a0', \
"Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
assert get_optimal_action(mdp, {'s0': -2e10, 's1': 0, 's2': -1e10}, 's0', 0.9) == 'a1', \
"Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
In [ ]:
if has_graphviz:
try:
display(plot_graph_optimal_strategy_and_state_values(mdp, state_values))
except ImportError:
raise ImportError("Run the cell that starts with \"%%writefile mdp_get_action_value.py\"")
In [ ]:
# Measure agent's average reward
s = mdp.reset()
rewards = []
for _ in range(10000):
s, r, done, _ = mdp.step(get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
print("average reward: ", np.mean(rewards))
assert(0.40 < np.mean(rewards) < 0.55)
In [ ]:
from mdp import FrozenLakeEnv
mdp = FrozenLakeEnv(slip_chance=0)
mdp.render()
In [ ]:
def value_iteration(mdp, state_values=None, gamma=0.9, num_iter=1000, min_difference=1e-5):
""" performs num_iter value iteration steps starting from state_values. Same as before but in a function """
state_values = state_values or {s: 0 for s in mdp.get_all_states()}
for i in range(num_iter):
# Compute new state values using the functions you defined above. It must be a dict {state : new_V(state)}
new_state_values = <YOUR CODE>
assert isinstance(new_state_values, dict)
# Compute difference
diff = max(abs(new_state_values[s] - state_values[s])
for s in mdp.get_all_states())
print("iter %4i | diff: %6.5f | V(start): %.3f " %
(i, diff, new_state_values[mdp._initial_state]))
state_values = new_state_values
if diff < min_difference:
break
return state_values
In [ ]:
state_values = value_iteration(mdp)
In [ ]:
s = mdp.reset()
mdp.render()
for t in range(100):
a = get_optimal_action(mdp, state_values, s, gamma)
print(a, end='\n\n')
s, r, done, _ = mdp.step(a)
mdp.render()
if done:
break
In [ ]:
import matplotlib.pyplot as plt
%matplotlib inline
def draw_policy(mdp, state_values):
plt.figure(figsize=(3, 3))
h, w = mdp.desc.shape
states = sorted(mdp.get_all_states())
V = np.array([state_values[s] for s in states])
Pi = {s: get_optimal_action(mdp, state_values, s, gamma) for s in states}
plt.imshow(V.reshape(w, h), cmap='gray', interpolation='none', clim=(0, 1))
ax = plt.gca()
ax.set_xticks(np.arange(h)-.5)
ax.set_yticks(np.arange(w)-.5)
ax.set_xticklabels([])
ax.set_yticklabels([])
Y, X = np.mgrid[0:4, 0:4]
a2uv = {'left': (-1, 0), 'down': (0, -1), 'right': (1, 0), 'up': (0, 1)}
for y in range(h):
for x in range(w):
plt.text(x, y, str(mdp.desc[y, x].item()),
color='g', size=12, verticalalignment='center',
horizontalalignment='center', fontweight='bold')
a = Pi[y, x]
if a is None:
continue
u, v = a2uv[a]
plt.arrow(x, y, u*.3, -v*.3, color='m',
head_width=0.1, head_length=0.1)
plt.grid(color='b', lw=2, ls='-')
plt.show()
In [ ]:
state_values = {s: 0 for s in mdp.get_all_states()}
for i in range(10):
print("after iteration %i" % i)
state_values = value_iteration(mdp, state_values, num_iter=1)
draw_policy(mdp, state_values)
# please ignore iter 0 at each step
In [ ]:
from IPython.display import clear_output
from time import sleep
mdp = FrozenLakeEnv(map_name='8x8', slip_chance=0.1)
state_values = {s: 0 for s in mdp.get_all_states()}
for i in range(30):
clear_output(True)
print("after iteration %i" % i)
state_values = value_iteration(mdp, state_values, num_iter=1)
draw_policy(mdp, state_values)
sleep(0.5)
# please ignore iter 0 at each step
Massive tests
In [ ]:
mdp = FrozenLakeEnv(slip_chance=0)
state_values = value_iteration(mdp)
total_rewards = []
for game_i in range(1000):
s = mdp.reset()
rewards = []
for t in range(100):
s, r, done, _ = mdp.step(
get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
if done:
break
total_rewards.append(np.sum(rewards))
print("average reward: ", np.mean(total_rewards))
assert(1.0 <= np.mean(total_rewards) <= 1.0)
print("Well done!")
In [ ]:
# Measure agent's average reward
mdp = FrozenLakeEnv(slip_chance=0.1)
state_values = value_iteration(mdp)
total_rewards = []
for game_i in range(1000):
s = mdp.reset()
rewards = []
for t in range(100):
s, r, done, _ = mdp.step(
get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
if done:
break
total_rewards.append(np.sum(rewards))
print("average reward: ", np.mean(total_rewards))
assert(0.8 <= np.mean(total_rewards) <= 0.95)
print("Well done!")
In [ ]:
# Measure agent's average reward
mdp = FrozenLakeEnv(slip_chance=0.25)
state_values = value_iteration(mdp)
total_rewards = []
for game_i in range(1000):
s = mdp.reset()
rewards = []
for t in range(100):
s, r, done, _ = mdp.step(
get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
if done:
break
total_rewards.append(np.sum(rewards))
print("average reward: ", np.mean(total_rewards))
assert(0.6 <= np.mean(total_rewards) <= 0.7)
print("Well done!")
In [ ]:
# Measure agent's average reward
mdp = FrozenLakeEnv(slip_chance=0.2, map_name='8x8')
state_values = value_iteration(mdp)
total_rewards = []
for game_i in range(1000):
s = mdp.reset()
rewards = []
for t in range(100):
s, r, done, _ = mdp.step(
get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
if done:
break
total_rewards.append(np.sum(rewards))
print("average reward: ", np.mean(total_rewards))
assert(0.6 <= np.mean(total_rewards) <= 0.8)
print("Well done!")
When we ran value iteration on the small frozen lake problem, the last iteration where an action changed was iteration 6--i.e., value iteration computed the optimal policy at iteration 6. Are there any guarantees regarding how many iterations it'll take value iteration to compute the optimal policy? There are no such guarantees without additional assumptions--we can construct the MDP in such a way that the greedy policy will change after arbitrarily many iterations.
Your task: define an MDP with at most 3 states and 2 actions, such that when you run value iteration, the optimal action changes at iteration >= 50. Use discount=0.95. (However, note that the discount doesn't matter here--you can construct an appropriate MDP with any discount.)
Note: value function must change at least once after iteration >=50, not necessarily change on every iteration till >=50.
In [ ]:
transition_probs = {
<YOUR CODE>
}
rewards = {
<YOUR CODE>
}
from mdp import MDP
from numpy import random
mdp = MDP(transition_probs, rewards, initial_state=random.choice(tuple(transition_probs.keys())))
# Feel free to change the initial_state
In [ ]:
state_values = {s: 0 for s in mdp.get_all_states()}
policy = np.array([get_optimal_action(mdp, state_values, state, gamma)
for state in sorted(mdp.get_all_states())])
for i in range(100):
print("after iteration %i" % i)
state_values = value_iteration(mdp, state_values, num_iter=1)
new_policy = np.array([get_optimal_action(mdp, state_values, state, gamma)
for state in sorted(mdp.get_all_states())])
n_changes = (policy != new_policy).sum()
print("N actions changed = %i \n" % n_changes)
policy = new_policy
# please ignore iter 0 at each step
Note: Assume that $\mathcal{S}, \mathcal{A}$ are finite.
Update of value function in value iteration can be rewritten in a form of Bellman operator:
$$(TV)(s) = \max_{a \in \mathcal{A}}\mathbb{E}\left[ r_{t+1} + \gamma V(s_{t+1}) | s_t = s, a_t = a\right]$$Value iteration algorithm with Bellman operator:
Initialize $V_0$
for $k = 0,1,2,...$ do
$V_{k+1} \leftarrow TV_k$
end for
In lecture we established contraction property of bellman operator:
$$ ||TV - TU||_{\infty} \le \gamma ||V - U||_{\infty} $$For all $V, U$
Using contraction property of Bellman operator, Banach fixed-point theorem and Bellman equations prove that value function converges to $V^*$ in value iterateion$
<-- Your proof here -->
Consider the following algorithm:
Initialize $V_0$
for $k = 0,1,2,...$ do
Select some state $s_k \in \mathcal{S}$
$V(s_k) := (TV)(s_k)$
end for
Note that unlike common value iteration, here we update only a single state at a time.
Homework. Prove the following proposition:
If for all $s \in \mathcal{S}$, $s$ appears in the sequence $(s_0, s_1, ...)$ infinitely often, then $V$ converges to $V*$
<-- Your proof here -->
Let's implement exact policy iteration (PI), which has the following pseudocode:
Initialize $\pi_0$ // random or fixed action
For $n=0, 1, 2, \dots$
Unlike VI, policy iteration has to maintain a policy - chosen actions from all states - and estimate $V^{\pi_{n}}$ based on this policy. It only changes policy once values converged.
Below are a few helpers that you may or may not use in your implementation.
In [ ]:
transition_probs = {
's0': {
'a0': {'s0': 0.5, 's2': 0.5},
'a1': {'s2': 1}
},
's1': {
'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
'a1': {'s1': 0.95, 's2': 0.05}
},
's2': {
'a0': {'s0': 0.4, 's1': 0.6},
'a1': {'s0': 0.3, 's1': 0.3, 's2': 0.4}
}
}
rewards = {
's1': {'a0': {'s0': +5}},
's2': {'a1': {'s0': -1}}
}
from mdp import MDP
mdp = MDP(transition_probs, rewards, initial_state='s0')
Let's write a function called compute_vpi
that computes the state-value function $V^{\pi}$ for an arbitrary policy $\pi$.
Unlike VI, this time you must find the exact solution, not just a single iteration.
Recall that $V^{\pi}$ satisfies the following linear equation: $$V^{\pi}(s) = \sum_{s'} P(s,\pi(s),s')[ R(s,\pi(s),s') + \gamma V^{\pi}(s')]$$
You'll have to solve a linear system in your code. (Find an exact solution, e.g., with np.linalg.solve
.)
In [ ]:
def compute_vpi(mdp, policy, gamma):
"""
Computes V^pi(s) FOR ALL STATES under given policy.
:param policy: a dict of currently chosen actions {s : a}
:returns: a dict {state : V^pi(state) for all states}
"""
<YOUR CODE>
return <YOUR CODE>
In [ ]:
test_policy = {s: np.random.choice(
mdp.get_possible_actions(s)) for s in mdp.get_all_states()}
new_vpi = compute_vpi(mdp, test_policy, gamma)
print(new_vpi)
assert type(
new_vpi) is dict, "compute_vpi must return a dict {state : V^pi(state) for all states}"
Once we've got new state values, it's time to update our policy.
In [ ]:
def compute_new_policy(mdp, vpi, gamma):
"""
Computes new policy as argmax of state values
:param vpi: a dict {state : V^pi(state) for all states}
:returns: a dict {state : optimal action for all states}
"""
<YOUR CODE>
return <YOUR CODE>
In [ ]:
new_policy = compute_new_policy(mdp, new_vpi, gamma)
print(new_policy)
assert type(
new_policy) is dict, "compute_new_policy must return a dict {state : optimal action for all states}"
Main loop
In [ ]:
def policy_iteration(mdp, policy=None, gamma=0.9, num_iter=1000, min_difference=1e-5):
"""
Run the policy iteration loop for num_iter iterations or till difference between V(s) is below min_difference.
If policy is not given, initialize it at random.
"""
<YOUR CODE: a whole lot of it>
return state_values, policy
Your PI Results
In [ ]:
<YOUR CODE: compare PI and VI on the MDP from bonus 1, then on small & large FrozenLake>
Note: Assume that $\mathcal{S}, \mathcal{A}$ are finite.
We can define another Bellman operator:
$$(T_{\pi}V)(s) = \mathbb{E}_{r, s'|s, a = \pi(s)}\left[r + \gamma V(s')\right]$$And rewrite policy iteration algorithm in operator form:
Initialize $\pi_0$
for $k = 0,1,2,...$ do
Solve $V_k = T_{\pi_k}V_k$
Select $\pi_{k+1}$ s.t. $T_{\pi_{k+1}}V_k = TV_k$
end for
To prove convergence of the algorithm we need to prove two properties: contraction an monotonicity.
For all $V, U$ if $V(s) \le U(s)$ $\forall s \in \mathcal{S}$ then $(T_\pi V)(s) \le (T_\pi U)(s)$ $\forall s \in \mathcal{S}$
<-- Your proof here -->
For all $V, U$
<-- Your proof here -->
Prove that there exists iteration $k_0$ such that $\pi_k = \pi^*$ for all $k \ge k_0$
<-- Your proof here -->