This week's methods are all built to solve Markov Decision Processes. In the broadest sense, an MDP is defined by how it changes states and how rewards are computed.
State transition is defined by $P(s' |s,a)$ - how likely are you to end at state $s'$ if you take action $a$ from state $s$. Now there's more than one way to define rewards, but we'll use $r(s,a,s')$ function for convenience.
This notebook is inspired by the awesome CS294 by Berkeley
For starters, let's define a simple MDP from this picture:
In [1]:
# If you Colab, uncomment this please
# !wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/week02_value_based/mdp.py
transition_probs = {
's0': {
'a0': {'s0': 0.5, 's2': 0.5},
'a1': {'s2': 1}
},
's1': {
'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
'a1': {'s1': 0.95, 's2': 0.05}
},
's2': {
'a0': {'s0': 0.4, 's2': 0.6},
'a1': {'s0': 0.3, 's1': 0.3, 's2': 0.4}
}
}
rewards = {
's1': {'a0': {'s0': +5}},
's2': {'a1': {'s0': -1}}
}
from mdp import MDP
mdp = MDP(transition_probs, rewards, initial_state='s0')
We can now use MDP just as any other gym environment:
In [2]:
print('initial state =', mdp.reset())
next_state, reward, done, info = mdp.step('a1')
print('next_state = %s, reward = %s, done = %s' % (next_state, reward, done))
but it also has other methods that you'll need for Value Iteration
In [3]:
print("mdp.get_all_states =", mdp.get_all_states())
print("mdp.get_possible_actions('s1') = ", mdp.get_possible_actions('s1'))
print("mdp.get_next_states('s1', 'a0') = ", mdp.get_next_states('s1', 'a0'))
print("mdp.get_reward('s1', 'a0', 's0') = ", mdp.get_reward('s1', 'a0', 's0'))
print("mdp.get_transition_prob('s1', 'a0', 's0') = ",
mdp.get_transition_prob('s1', 'a0', 's0'))
You can also visualize any MDP with the drawing fuction donated by neer201.
You have to install graphviz for system and for python. For ubuntu just run:
sudo apt-get install graphviz
pip install graphviz
Note: Installing graphviz on some OS (esp. Windows) may be tricky. However, you can ignore this part alltogether and use the standart vizualization.
In [4]:
#! pip install graphviz
from mdp import has_graphviz
from IPython.display import display
print("Graphviz available:", has_graphviz)
In [5]:
if has_graphviz:
from mdp import plot_graph, plot_graph_with_state_values, \
plot_graph_optimal_strategy_and_state_values
display(plot_graph(mdp))
Now let's build something to solve this MDP. The simplest algorithm so far is Value Iteration
Here's the pseudo-code for VI:
1.
Initialize $V^{(0)}(s)=0$, for all $s$
2.
For $i=0, 1, 2, \dots$
3.
$ \quad V_{(i+1)}(s) = \max_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')]$, for all $s$
First, let's write a function to compute the state-action value function $Q^{\pi}$, defined as follows
$$Q_i(s, a) = \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')]$$
In [6]:
%%writefile mdp_get_action_value.py
def get_action_value(mdp, state_values, state, action, gamma):
""" Computes Q(s,a) as in formula above """
# YOUR CODE HERE
#states = mdp.get_all_states()
next_states = mdp.get_next_states(action=action, state=state)
q_i = 0
for next_state in next_states:
v_i = state_values[next_state]
r = mdp.get_reward(state=state, action=action, next_state=next_state)
p = mdp.get_transition_prob(action=action, next_state=next_state, state=state)
q_i = q_i + p * (r + gamma * v_i)
return q_i
In [7]:
from mdp_get_action_value import get_action_value
In [8]:
import numpy as np
test_Vs = {s: i for i, s in enumerate(sorted(mdp.get_all_states()))}
assert np.isclose(get_action_value(mdp, test_Vs, 's2', 'a1', 0.9), 0.69)
assert np.isclose(get_action_value(mdp, test_Vs, 's1', 'a0', 0.9), 3.95)
Using $Q(s,a)$ we can now define the "next" V(s) for value iteration. $$V_{(i+1)}(s) = \max_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] = \max_a Q_i(s,a)$$
In [9]:
def get_new_state_value(mdp, state_values, state, gamma):
""" Computes next V(s) as in formula above. Please do not change state_values in process. """
if mdp.is_terminal(state):
return 0
# Your code here:
actions = mdp.get_possible_actions(state)
max_q_i = -99999999
for action in actions:
q_i = get_action_value(mdp, state_values, state, action, gamma)
if q_i > max_q_i:
max_q_i = q_i
return max_q_i
In [10]:
test_Vs_copy = dict(test_Vs)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's0', 0.9), 1.8)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's2', 0.9), 1.08)
assert test_Vs == test_Vs_copy, "please do not change state_values in get_new_state_value"
Finally, let's combine everything we wrote into a working value iteration algo.
In [11]:
# parameters
gamma = 0.9 # discount for MDP
num_iter = 100 # maximum iterations, excluding initialization
# stop VI if new values are this close to old values (or closer)
min_difference = 0.001
# initialize V(s)
state_values = {s: 0 for s in mdp.get_all_states()}
if has_graphviz:
display(plot_graph_with_state_values(mdp, state_values))
for i in range(num_iter):
# Compute new state values using the functions you defined above.
# It must be a dict {state : float V_new(state)}
#new_state_values = <YOUR CODE HERE >
new_state_values = dict()
for state, v in state_values.items():
V_new = float(get_new_state_value(mdp, state_values, state, gamma))
new_state_values[state] = V_new
assert isinstance(new_state_values, dict)
# Compute difference
diff = max(abs(new_state_values[s] - state_values[s])
for s in mdp.get_all_states())
print("iter %4i | diff: %6.5f | " % (i, diff), end="")
print(' '.join("V(%s) = %.3f" % (s, v) for s, v in state_values.items()))
state_values = new_state_values
if diff < min_difference:
print("Terminated")
break
In [12]:
if has_graphviz:
display(plot_graph_with_state_values(mdp, state_values))
In [13]:
print("Final state values:", state_values)
assert abs(state_values['s0'] - 3.781) < 0.01
assert abs(state_values['s1'] - 7.294) < 0.01
assert abs(state_values['s2'] - 4.202) < 0.01
Now let's use those $V^{*}(s)$ to find optimal actions in each state
$$\pi^*(s) = argmax_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] = argmax_a Q_i(s,a)$$
The only difference vs V(s) is that here we take not max but argmax: find action such with maximum Q(s,a).
In [14]:
def get_optimal_action(mdp, state_values, state, gamma=0.9):
""" Finds optimal action using formula above. """
if mdp.is_terminal(state):
return None
# <YOUR CODE HERE>
actions = mdp.get_possible_actions(state)
max_q_i = -99999999
arg_max_q_i = None
for action in actions:
q_i = get_action_value(mdp, state_values, state, action, gamma)
if q_i > max_q_i:
max_q_i = q_i
arg_max_q_i = action
return arg_max_q_i
In [15]:
assert get_optimal_action(mdp, state_values, 's0', gamma) == 'a1'
assert get_optimal_action(mdp, state_values, 's1', gamma) == 'a0'
assert get_optimal_action(mdp, state_values, 's2', gamma) == 'a1'
In [16]:
if has_graphviz:
try:
display(plot_graph_optimal_strategy_and_state_values(mdp, state_values))
except ImportError:
raise ImportError("Run the cell that starts with \"%%writefile mdp_get_action_value.py\"")
In [17]:
# Measure agent's average reward
s = mdp.reset()
rewards = []
for _ in range(10000):
s, r, done, _ = mdp.step(get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
print("average reward: ", np.mean(rewards))
assert(0.40 < np.mean(rewards) < 0.55)
In [18]:
from mdp import FrozenLakeEnv
mdp = FrozenLakeEnv(slip_chance=0)
mdp.render()
In [41]:
def value_iteration(mdp, state_values=None, gamma=0.9, num_iter=1000, min_difference=1e-5):
""" performs num_iter value iteration steps starting from state_values. Same as before but in a function """
state_values = state_values or {s: 0 for s in mdp.get_all_states()}
for i in range(num_iter):
# Compute new state values using the functions you defined above. It must be a dict {state : new_V(state)}
#new_state_values = <YOUR CODE HERE >
new_state_values = dict()
for state in mdp.get_all_states():
new_state_values[state] = float(get_new_state_value(mdp, state_values, state, gamma))
assert isinstance(new_state_values, dict)
# Compute difference
diff = max(abs(new_state_values[s] - state_values[s])
for s in mdp.get_all_states())
print("iter %4i | diff: %6.5f | V(start): %.3f " %
(i, diff, new_state_values[mdp._initial_state]))
state_values = new_state_values
if diff < min_difference:
break
return state_values
In [42]:
state_values = value_iteration(mdp)
In [43]:
s = mdp.reset()
mdp.render()
for t in range(100):
a = get_optimal_action(mdp, state_values, s, gamma)
print(a, end='\n\n')
s, r, done, _ = mdp.step(a)
mdp.render()
if done:
break
In [44]:
import matplotlib.pyplot as plt
%matplotlib inline
def draw_policy(mdp, state_values):
plt.figure(figsize=(3, 3))
h, w = mdp.desc.shape
states = sorted(mdp.get_all_states())
V = np.array([state_values[s] for s in states])
Pi = {s: get_optimal_action(mdp, state_values, s, gamma) for s in states}
plt.imshow(V.reshape(w, h), cmap='gray', interpolation='none', clim=(0, 1))
ax = plt.gca()
ax.set_xticks(np.arange(h)-.5)
ax.set_yticks(np.arange(w)-.5)
ax.set_xticklabels([])
ax.set_yticklabels([])
Y, X = np.mgrid[0:4, 0:4]
a2uv = {'left': (-1, 0), 'down': (0, -1), 'right': (1, 0), 'up': (-1, 0)}
for y in range(h):
for x in range(w):
plt.text(x, y, str(mdp.desc[y, x].item()),
color='g', size=12, verticalalignment='center',
horizontalalignment='center', fontweight='bold')
a = Pi[y, x]
if a is None:
continue
u, v = a2uv[a]
plt.arrow(x, y, u*.3, -v*.3, color='m',
head_width=0.1, head_length=0.1)
plt.grid(color='b', lw=2, ls='-')
plt.show()
In [45]:
state_values = {s: 0 for s in mdp.get_all_states()}
for i in range(10):
print("after iteration %i" % i)
state_values = value_iteration(mdp, state_values, num_iter=1)
draw_policy(mdp, state_values)
# please ignore iter 0 at each step
In [46]:
from IPython.display import clear_output
from time import sleep
mdp = FrozenLakeEnv(map_name='8x8', slip_chance=0.1)
state_values = {s: 0 for s in mdp.get_all_states()}
for i in range(30):
clear_output(True)
print("after iteration %i" % i)
state_values = value_iteration(mdp, state_values, num_iter=1)
draw_policy(mdp, state_values)
sleep(0.5)
# please ignore iter 0 at each step
Massive tests
In [47]:
mdp = FrozenLakeEnv(slip_chance=0)
state_values = value_iteration(mdp)
total_rewards = []
for game_i in range(1000):
s = mdp.reset()
rewards = []
for t in range(100):
s, r, done, _ = mdp.step(
get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
if done:
break
total_rewards.append(np.sum(rewards))
print("average reward: ", np.mean(total_rewards))
assert(1.0 <= np.mean(total_rewards) <= 1.0)
print("Well done!")
In [48]:
# Measure agent's average reward
mdp = FrozenLakeEnv(slip_chance=0.1)
state_values = value_iteration(mdp)
total_rewards = []
for game_i in range(1000):
s = mdp.reset()
rewards = []
for t in range(100):
s, r, done, _ = mdp.step(
get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
if done:
break
total_rewards.append(np.sum(rewards))
print("average reward: ", np.mean(total_rewards))
assert(0.8 <= np.mean(total_rewards) <= 0.95)
print("Well done!")
In [49]:
# Measure agent's average reward
mdp = FrozenLakeEnv(slip_chance=0.25)
state_values = value_iteration(mdp)
total_rewards = []
for game_i in range(1000):
s = mdp.reset()
rewards = []
for t in range(100):
s, r, done, _ = mdp.step(
get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
if done:
break
total_rewards.append(np.sum(rewards))
print("average reward: ", np.mean(total_rewards))
assert(0.6 <= np.mean(total_rewards) <= 0.7)
print("Well done!")
In [50]:
# Measure agent's average reward
mdp = FrozenLakeEnv(slip_chance=0.2, map_name='8x8')
state_values = value_iteration(mdp)
total_rewards = []
for game_i in range(1000):
s = mdp.reset()
rewards = []
for t in range(100):
s, r, done, _ = mdp.step(
get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
if done:
break
total_rewards.append(np.sum(rewards))
print("average reward: ", np.mean(total_rewards))
assert(0.6 <= np.mean(total_rewards) <= 0.8)
print("Well done!")
In [51]:
from submit import submit_assigment
submit_assigment(
get_action_value,
get_new_state_value,
get_optimal_action,
value_iteration,
'tonatiuh_rangel@hotmail.com',
'Q9PtGJR8wzEGIdJT',
verbose=False,
)
In [ ]: