This assignment will review exact methods for solving Markov Decision Processes (MDPs) with finite state and action spaces. We will implement value iteration (VI) and policy iteration (PI) for a finite MDP, both of which find the optimal policy in a finite number of iterations.
For this assignment, we will consider discounted infinite-horizon MDPs. Recall that the MDP is defined by the tuple $(S, A, R, P, \rho, \gamma)$, where
Here we will consider MDPs where $S,A$ are finite sets, hence $R$ and $P$ are 3D arrays.
We'll randomly generate an MDP which your algorithms should be able to solve. Using randomly generated MDPs is a bit dry, but it emphasizes that policy iteration can be expressed with a few array operations.
In [1]:
import numpy as np, numpy.random as nr
import hw_utils
In [2]:
nr.seed(0) # seed random number generator
nS = 10
nA = 2
# nS: number of states
# nA: number of actions
R_rand = nr.rand(nS, nA, nS) # reward function
# R[i,j,k] := R(s=i, a=j, s'=k),
# i.e., the dimensions are (current state, action, next state)
P_rand = nr.rand(nS, nA, nS)
# P[i,j,k] := P(s'=k | s=i, a=j)
# i.e., dimensions are (current state, action, next state)
P_rand /= P_rand.sum(axis=2,keepdims=True) # normalize conditional probabilities
gamma = 0.90
You'll implement the Bellman backup operator value operator, called vstar_backup
below. It should compute $V^{(n+1)}$, defined as
$$V^{(n+1)}(s) = \max_a \sum_{s'} P(s,a,s') [ R(s,a,s') + \gamma V^{(n)}(s')]$$
This update is often called a backup, since we are updating the state $s$ based on possible future states $s'$, i.e., we are propagating the value function backwards in time, in a sense. The function is called vstar_backup because this update converges to the optimal value function, which is conventionally called $V^*$.
In [35]:
def vstar_backup(v_n, P_pan, R_pan, gamma):
"""
Apply Bellman backup operator V -> T[V], i.e., perform one step of value iteration
:param v_n: the state-value function (1D array) for the previous iteration, i.e. V^(n).
:param P_pan: the transition function (3D array: S*A*S -> R)
:param R_pan: the reward function (3D array: S*A*S -> R)
:param gamma: the discount factor (scalar)
:return: a pair (v_p, a_p), where v_p is the updated state-value function and should be a 1D array (S -> R),
and a_p is the updated (deterministic) policy, which should also be a 1D array (S -> A)
We're using the subscript letters to label the axes
E.g., "pan" refers to "Previous state", "Action", "Next state"
"""
nS = P_pan.shape[0] # number of states
# YOUR CODE HERE
nA = P_pan.shape[1] # number of actions
v_p = np.zeros(nS)
a_p = np.zeros(nS)
for s in range(0,nS):
v_p[s] = 0
a_p[s] = 0
max_a = None;
max_sum = -np.iinfo('i').min
for a in range(0,nA):
sum_sp = np.sum(P_pan[s,a] * (R_pan[s,a] + gamma * v_n))
if max_a == None or max_sum < sum_sp:
max_sum= sum_sp
max_a = a
v_p[s] = max_sum
a_p[s] = max_a
#raise NotImplementedError()
assert v_p.shape == (nS,)
assert a_p.shape == (nS,)
return (v_p, a_p)
Now, let's test value iteration on a randomly generated MDP.
In [36]:
# DO NOT CHANGE THIS PART!
def value_iteration(P, R, gamma, n_iter, verbose=False):
nS = P.shape[0]
Vprev = np.zeros(nS)
Aprev = None
chg_actions_seq = []
if verbose:
print(hw_utils.fmt_row(13, ["iter", "max|V-Vprev|", "# chg actions", "V[0]"], header=True))
for i in range(n_iter):
V, A = vstar_backup(Vprev, P, R, gamma)
chg_actions = "N/A" if Aprev is None else (A != Aprev).sum()
chg_actions_seq.append(chg_actions)
if verbose:
print(hw_utils.fmt_row(13, [i+1, np.abs(V-Vprev).max(), chg_actions, V[0]]))
Vprev, Aprev = V, A
return V, A, chg_actions_seq
value_iteration(P_rand, R_rand, gamma, n_iter=20, verbose=True);
# Expected output:
# iter | max|V-Vprev| | # chg actions | V[0]
# -------------------------------------------------------------
# 1 | 0.707147 | N/A | 0.618258
# 2 | 0.514599 | 1 | 1.13286
# 3 | 0.452404 | 0 | 1.58322
# 4 | 0.405723 | 0 | 1.98855
# 5 | 0.364829 | 0 | 2.35327
# 6 | 0.328307 | 0 | 2.68157
# 7 | 0.295474 | 0 | 2.97704
# 8 | 0.265926 | 0 | 3.24297
# 9 | 0.239333 | 0 | 3.4823
# 10 | 0.2154 | 0 | 3.6977
# 11 | 0.19386 | 0 | 3.89156
# 12 | 0.174474 | 0 | 4.06604
# 13 | 0.157026 | 0 | 4.22306
# 14 | 0.141324 | 0 | 4.36439
# 15 | 0.127191 | 0 | 4.49158
# 16 | 0.114472 | 0 | 4.60605
# 17 | 0.103025 | 0 | 4.70908
# 18 | 0.0927225 | 0 | 4.8018
# 19 | 0.0834503 | 0 | 4.88525
# 20 | 0.0751053 | 0 | 4.96035
Out[36]:
You'll notice that value iteration only takes two iterations to converge to the right actions everywhere. (However, note that the actual values converge rather slowly.) That's because most randomly generated MDPs aren't very interesting. Also, note that the value of any particular state (e.g., V[0], shown in the rightmost column) increases monotonically. Under which conditions is that true? [question will not be graded]
Specifically, the requirement is that your MDP should have 10 states and 2 actions, and the policy should be updated for each of the first 10 iterations.
Here's a hint for one solution: arrange the states on a line, so that state i
can only transition to one of {i-1, i, i+1}
.
You should create 3D arrays P,R in hw1.py
that define the MDP.
In [155]:
##### nS = 10
Pslow = np.zeros((10, 2, 10)) # YOUR CODE SHOULD FILL IN THE VALUES OF Tslow
Rslow = np.zeros((10, 2, 10)) # YOUR CODE SHOULD FILL IN THE VALUES OF Rslow
# Problem 1b
# YOUR CODE HERE
Rslow = -1*Rslow
for s in range(0,10):
if s==0:
Pslow[s, 0, s+1] = 1
Pslow[s, 1, s] = 1
Rslow[s, 0, s+1] = 10
Rslow[s, 1, s] = -10
elif s==9:
Pslow[s, 0, s-1] = 1
Pslow[s, 1, s+1] = 1
Rslow[s, 0, s-1] = 10
Rslow[s, 1, s+1] = 10
else:
Pslow[s, 0, s-1] = 1
Pslow[s, 1, s+1] = 1
Rslow[s, 0, s+1] = 5
Rslow[s, 1, s] = 5
assert Pslow.shape == (10,2,10), "P has the wrong shape"
assert Rslow.shape == (10,2,10), "R has the wrong shape"
#print Pslow
print Pslow.sum(axis=2)
assert np.allclose(Pslow.sum(axis=2), np.ones((10,2))), "Transition probabilities should sum to 1"
value_iteration(Pslow, Rslow, gamma, n_iter=20, verbose=True);
# The first 10 rows of the third column of the output should be something like
# # chg actions
# -------------
# N/A
# 2
# 1
# 1
# 1
# 1
# 1
# 1
# 1
# 1
# The actual numbers can differ, as long as they are > 0.
The next task is to implement exact policy iteration (PI).
PI first initializes the policy $\pi_0(s)$, and then it performs the following two steps on the $n$th iteration:
We'll break step 1 into two parts.
First you'll write a function called compute_vpi
that computes the state-value function $V^{\pi}$ for an arbitrary policy $\pi$.
Recall that $V^{\pi}$ satisfies the following linear equation:
$$V^{\pi}(s) = \sum_{s'} P(s,\pi(s),s')[ R(s,\pi(s),s') + \gamma V^{\pi}(s')]$$
You'll have to solve a linear system in your code.
In [ ]:
def compute_vpi(pi, P, R, gamma):
"""
:param pi: a deterministic policy (1D array: S -> A)
:param P: the transition probabilities (3D array: S*A*S -> R)
:param R: the reward function (3D array: S*A*S -> R)
:param gamma: the discount factor (scalar)
:return: vpi, the state-value function for the policy pi
"""
nS = P.shape[0]
# YOUR CODE HERE
raise NotImplementedError()
assert vpi.shape == (nS,)
return vpi
pi0 = np.zeros(nS,dtype='i')
compute_vpi(pi0, P_rand, R_rand, gamma)
# Expected output:
# array([ 5.206217 , 5.15900351, 5.01725926, 4.76913715, 5.03154609,
# 5.06171323, 4.97964471, 5.28555573, 5.13320501, 5.08988046])
In [ ]:
def compute_qpi(vpi, pi, P, R, gamma):
"""
:param pi: a deterministic policy (1D array: S -> A)
:param T: the transition function (3D array: S*A*S -> R)
:param R: the reward function (3D array: S*A*S -> R)
:param gamma: the discount factor (scalar)
:return: qpi, the state-action-value function for the policy pi
"""
nS = P.shape[0]
nA = P.shape[1]
# YOUR CODE HERE
raise NotImplementedError()
assert qpi.shape == (nS, nA)
return qpi
vpi = compute_vpi(pi0, P_rand, R_rand, gamma)
compute_qpi(vpi, pi0, P_rand, R_rand, gamma)
# Expected output:
# array([[ 5.206217 , 5.20238706],
# [ 5.15900351, 5.1664316 ],
# [ 5.01725926, 4.99211906],
# [ 4.76913715, 4.98080235],
# [ 5.03154609, 4.89448888],
# [ 5.06171323, 5.29418621],
# [ 4.97964471, 5.06868986],
# [ 5.28555573, 4.9156956 ],
# [ 5.13320501, 4.97736801],
# [ 5.08988046, 5.00511597]])
Now we're ready to run policy iteration!
In [ ]:
def policy_iteration(P, R, gamma, n_iter):
pi_prev = np.zeros(P.shape[0],dtype='i')
print(hw_utils.fmt_row(13, ["iter", "# chg actions", "Q[0,0]"], header=True))
for i in range(n_iter):
vpi = compute_vpi(pi_prev, P_rand, R_rand, gamma)
qpi = compute_qpi(vpi, pi_prev, P, R, gamma)
pi = qpi.argmax(axis=1)
print(hw_utils.fmt_row(13, [i+1, (pi != pi_prev).sum(), qpi[0,0]]))
pi_prev = pi
policy_iteration(P_rand, R_rand, gamma, 10);
# Expected output:
# iter | # chg actions | Q[0,0]
# ---------------------------------------------
# 1 | 4 | 5.20622
# 2 | 2 | 5.59042
# 3 | 0 | 5.6255
# 4 | 0 | 5.6255
# 5 | 0 | 5.6255
# 6 | 0 | 5.6255
# 7 | 0 | 5.6255
# 8 | 0 | 5.6255
# 9 | 0 | 5.6255
# 10 | 0 | 5.6255
In [ ]:
try:
import autograder
instructor=True
except ImportError:
instructor=False
In [ ]:
"""Check that value iteration computes the correct result"""
# INSTRUCTOR ONLY -- DO NOT CHANGE THIS PART
if instructor: autograder.grade_value_iteration(value_iteration)
In [ ]:
"""Check that Tslow and Rslow updates the policy for each of the first 10 iterations"""
# INSTRUCTOR ONLY -- DO NOT CHANGE THIS PART
if instructor: autograder.grade_slow_mdp(value_iteration, Pslow, Rslow, gamma)
In [ ]:
"""Check that compute_vpi computes the correct result"""
# INSTRUCTOR ONLY -- DO NOT CHANGE THIS PART
if instructor: autograder.grade_compute_vpi(compute_vpi)
In [ ]:
"""Check that compute_qpi computes the correct result"""
# INSTRUCTOR ONLY -- DO NOT CHANGE THIS PART
if instructor: autograder.grade_compute_qpi(compute_qpi)