One advantage of Monte Carlo methods are that they don't require a model of the enviroment (e.g. the $p(s' | s, a)$ and $r(s,a,s')$ in Bellman equation).
In a nutshell, Monte Carlo methods collect statistics about $v(s)$ and $q(s, a)$.
Take $v(s)$ as an example. This is defined as $\mathbb{E}(G_t | S_t = s)$ for any $t$. In the first-visit MC methods, we are only interested in the expected return from state $s$ after the very first visit, so we don't worry about the time index $t$. There are also every-visit MC methods. In Sutton's book, first-visit MC is the focus.
Given state $s$, we think the return $G$ from any epsidoe is sampled from some distribution $p_s(G)$. We are not interested in the exact form of $p_s(G)$; in fact, that would require modeling of the enviroment. All we do is following the policy a number of times, and at the end of each episode, we have a sample $G$ from $p_s(G)$. Then $$v(s) = \mathbb{E}(G | s) \approx \frac{1}{N} \sum_i G_i$$
For example, suppose we have state sequence $S_0 S_1 S_2 S_1 S_3$ in one episode. $S_3$ is the terminating state and we get reward $x$ during the transition $S_1 \rightarrow S_3$, and zero reward in all other transitions. Suppose the discount facotr is $\gamma$. With this setup, $S_2$ gets a sample return $\gamma x$ and $S_1$ gets a sample return $\gamma^2 x$. Note only the first appearence of $S_1$ cares about the sample return.
$q(s,a)$ is computed in the similar fashion.
After each episode, update $q(s, a)$. For every state $s$ in episode, set $\pi(s) = \text{argmax}_a q(s,a)$. A variant is the $\epsilon$-soft on-policy.
Single deck blackjack. Dealer's strategy: sticks on any sum of 17 or greater. What strategy should the player have?
Player's state can be described by two variables: 1) The current sum (in the maximum sense); 2) Whether he has an ACE that can be used as 1 instead of 11. (There are alternatives to define the state.)
For example, suppose the player has {4, 5, 1} at hand, then the state is <20, TRUE>, where 20 is the sum of 4, 5, 11 and TRUE indicates he has an ACE usable as 1. If player hits and get another card 3, then the state becomes <13, FALSE>.
The overall state can be described by the above two factors, plus dealer's one visible card: one of {ACE, 2, ... 10}. For player's state, we are only intereted in the case where the current sum is greater than or equals to 12; otherwise, we can always hit can get a better chance of winning.
In [ ]:
# Define some constants and utils.
import random
import numpy as np
from scipy.stats import beta
import matplotlib.pyplot as plt
DECK_OF_CARDS = [1,2,3,4,5,6,7,8,9,10,10,10,10] * 4 #J, Q, K count as 10.
DEALDER_STICK_THRESHOLD = 17
def sum_hand(hand):
min_s = sum(hand)
max_s = sum(hand)
if 1 in hand:
max_s = min_s + 10
if max_s <= 21 and max_s > min_s:
## Has a usable ACE.
return (max_s, 1)
else:
return (min_s, 0)
def busted(hand):
(s, _) = sum_hand(hand)
return s > 21
def convert_cards_to_state(my_cards, dealer_card):
(s, usable_ace) = sum_hand(my_cards)
# state is 0-based, for easy indexing with ndarray.
return (s-1, dealer_card-1, usable_ace)
In [ ]:
class BasePlayer:
# Return 1 if hit, 0 if stick.
def action(self, cards, rival_card):
raise NotImplementedError("action() is not implemented.")
class Dealer(BasePlayer):
def __init__(self):
self.stick_threshold = DEALDER_STICK_THRESHOLD
def action(self, cards, rival_card):
(s, _) = sum_hand(cards)
return int(self.stick_threshold > s)
In [ ]:
class Policy:
def __init__(self):
# stats is a 5D tensor.
# The first 3 dimensions describe state. The 4th dimension describes
# action (0 for stick, 1 for hit). The 5th dimension describes the
# reward: {-1, 0, 1}.
# self.states[i][j][k][a][r] saves counts of how many times we have
# reward `r` when we take action `a` for state s = (i, j, k).
self.stats = np.zeros((21, 10, 2, 2, 3), dtype='int64') # Init to 1 instead of 0 to smooth out.
# q is a 4D tensor that represents q(s, a).
self.q = np.zeros((21, 10, 2, 2), dtype='float64')
# pi is a 4D tensor that represents pi(a|s).
self.pi = np.zeros((21, 10, 2, 2), dtype='float64')
self.pi[0:10,:,:,:] = [0., 1.] # If my best sum is in [1, 11], then we always hit.
self.pi[20,:,:,:] = [1., 0.] # Always stick if my sum is 21.
# state_action_seq is a list of tuple.
def update(self, state_action_seq, final_reward):
#print 'updating with state_action_seq:',state_action_seq, ' with reward: ', final_reward
for state_action in state_action_seq[-2::]:
(i,j,k,a) = state_action
if i <= 10:
# No update: we always hit.
continue
if i == 20:
# No update: we always stick.
continue
# reward is in {-1, 0, 1}
self.stats[i,j,k,a,final_reward + 1] += 1
# q stores average reward so far.
# Denominator is the # of samples; the numerator is the net reward accumulated from +1 and -1 cases.
self.q[i,j,k,a] = (self.stats[i,j,k,a,2] - self.stats[i,j,k,a,0]) * 1.0 / np.sum(self.stats[i,j,k,a,:])
# If q[i,i,k,0] > q[i,j,k,1], argsort returns [1,0].
# This is exactly how greedy algorithm updates policy.
self.pi[i,j,k,:] = np.argsort(self.q[i,j,k,:])
def action(self, state):
return self.epsilon_greedy(state)
def epsilon_greedy(self, state, epsilon = 0.1):
# With epsilon probability, we explore the worse action.
if np.random.rand() < 2 * epsilon:
return self.rand_action()
(i,j,k) = state
return int(self.pi[i,j,k,1] > self.pi[i,j,k,0])
def get_win_loss(self, i, j, k, a):
return self.stats[i,j,k,a,2], self.stats[i,j,k,a,0]
def rand_action(self):
return int(np.random.rand() > 0.5)
# Get a soft probabilistic decision matrix (instead of a hard 0 or 1 matrix). The probability is based on
# sampling from two Beta distributions.
def bayesian_decision(self, usable_ace):
m = np.zeros((9,10))
for i in range(11,20,1):
for j in range(10):
win1, loss1 = self.get_win_loss(i,j,usable_ace, 0)
win2, loss2 = self.get_win_loss(i,j,usable_ace, 1)
m[i-11,j] = beta_policy_stick_probability(1 + win1, 1 + loss1, 1 + win2, 1 + loss2)
return np.flipud(1.0 - m)
# Visualize matrix as a grayscale image, assuming entries are in [0,1].
def imshow(self, ax, matrix, title):
ax.set_title(title)
ax.imshow(matrix, cmap='gray',vmin=0., vmax=1.)
ax.set_xlabel('Dealer Card')
ax.set_xticks(np.arange(10))
ax.set_xticklabels(np.arange(1,11,1).astype('S2'))
ax.set_ylabel('Player Card')
ax.set_yticks(np.arange(9))
ax.set_yticklabels(np.arange(20,11,-1).astype('S2'))
ax.spines['right'].set_color('none')
ax.spines['top'].set_color('none')
# Visualize the hard decision matrix and the soft (Bayesian) decision matrix.
def visualize(self):
fig = plt.figure(figsize=(12, 9), dpi=80)
usable_ace = 0
ax = plt.subplot(221)
decision = (self.pi[11:20,:,usable_ace,1] > self.pi[11:20,:,usable_ace,0]).astype(int)
self.imshow(ax, np.flipud(decision), 'Decision (No usable Ace)')
ax = plt.subplot(222)
bayesian_decision = self.bayesian_decision(0)
self.imshow(ax, bayesian_decision, 'Bayesian Decision (No usable Ace)')
usable_ace = 1
ax = plt.subplot(223)
decision = (self.pi[11:,:,usable_ace,1] > self.pi[11:,:,usable_ace,0]).astype(int)
self.imshow(ax, np.flipud(decision), 'Decision (with usable Ace)')
ax = plt.subplot(224)
bayesian_decision = self.bayesian_decision(usable_ace)
self.imshow(ax, bayesian_decision, 'Bayesian Decision (with usable Ace)')
plt.show()
# Plot the two Beta distributions of a given state. Helps to understand how sharp the distribution is.
def plot_beta(self, state):
(i,j,k) = state
fig = plt.figure()
a1, b1 = self.get_win_loss(i,j,k,0)
a2, b2 = self.get_win_loss(i,j,k,1)
x = np.linspace(0.01, 0.99, 100)
plt.plot(x, beta.pdf(x, a1, b1), 'r-', label='stick')
plt.plot(x, beta.pdf(x, a2, b2), 'b-', label='hit')
plt.legend(loc='upper left')
plt.show()
In [ ]:
class PolicyPlayer(BasePlayer):
def __init__(self, policy):
self.policy = policy
self.state_action_pairs = []
def action(self, cards, rival_card):
s = convert_cards_to_state(cards, rival_card)
a = self.policy.action(s)
(i,j,k) = s
self.state_action_pairs.append((i,j,k,a))
return a
def update_policy(self, reward):
self.policy.update(self.state_action_pairs, reward)
def reset_states(self):
self.state_action_pairs = []
In [ ]:
# Bayesian stuff. (Only used for visualization.)
def beta_policy_stick_probability(a1, b1, a2, b2, sample_size = 1000):
s1 = beta.rvs(a1, b1, size=sample_size)
s2 = beta.rvs(a2, b2, size=sample_size)
# How likely we should stick based on the samples.
return np.sum(s1 >= s2) * 1.0 / sample_size
# a1,b1 = win and loss for stick; a2, b2 = win and loss for hit.
def beta_policy_action(a1, b1, a2, b2):
# s1 (or s2) is a sampled estimation of winning likelihood of stick (or hit).
s1 = beta.rvs(a1, b1, size=1)
s2 = beta.rvs(a2, b2, size=1)
# Returns 1 if the sampled likelihood favors hit.
return int(s2[0] > s1[0])
In [ ]:
# A simple debugger (to avoid clutter in main code.)
class Dbg:
# level can be 0 or 1. (0 means silent.)
def __init__(self, level):
self.level = level
def print_hands(self, game):
if self.level == 0:
return
print 'dealer cards: ', game.dealer_cards
print 'player cards: ', game.player_cards
def on_dealer_action(self, action):
if self.level == 0:
return
print 'dealer action: ', action
def on_player_action(self, action):
if self.level == 0:
return
print 'player action: ', action
def print_bust_status(self, dealer, player):
if self.level == 0:
return
if player and dealer:
print 'both busted'
elif player:
print 'player busted'
elif dealer:
print 'dealer busted'
else:
pass
def print_sum_of_hands(self, dealer_sum, player_sum):
if self.level == 0:
return
print 'dealer sum: {}, player sum: {}'.format(dealer_sum, player_sum)
In [ ]:
class Game:
def __init__(self, dealer, player, debug_level):
self.dealer = dealer
self.player = player
self.dealer_cards = []
self.player_cards = []
self.cards = DECK_OF_CARDS[:]
random.shuffle(self.cards)
self.card_index = 0
self.dbg = Dbg(debug_level)
def on_player_action(self, action):
if action == 1:
self.player_cards.append(self.cards[self.card_index])
self.card_index +=1
self.dbg.on_player_action(action)
def on_dealer_action(self, action):
if action == 1:
self.dealer_cards.append(self.cards[self.card_index])
self.card_index +=1
self.dbg.on_dealer_action(action)
def check_bust(self):
dealer_busted = busted(self.dealer_cards)
player_busted = busted(self.player_cards)
self.dbg.print_bust_status(dealer_busted, player_busted)
return (dealer_busted, player_busted)
# Returns the reward for the player. Can be one of {-1, 0, 1}.
def compare_hands(self):
d_sum, _ = sum_hand(self.dealer_cards)
p_sum, _ = sum_hand(self.player_cards)
return np.sign(p_sum - d_sum)
# Returns 1 iff player wins.
def play(self):
self.player.reset_states()
self.dealer_cards = []
self.player_cards = []
self.on_dealer_action(1)
self.on_player_action(1)
self.on_dealer_action(1)
self.on_player_action(1)
self.dbg.print_hands(self)
while True:
dealer_action = self.dealer.action(self.dealer_cards, self.player_cards[0])
self.on_dealer_action(dealer_action)
player_action = self.player.action(self.player_cards, self.dealer_cards[0])
self.on_player_action(player_action)
self.dbg.print_hands(self)
dealer_busted, player_busted = self.check_bust()
if player_busted and dealer_busted:
return 0
if player_busted:
return -1
if dealer_busted:
return 1
if dealer_action == 0 and player_action == 0:
d_sum, _ = sum_hand(self.dealer_cards)
p_sum, _ = sum_hand(self.player_cards)
self.dbg.print_sum_of_hands(d_sum, p_sum)
return self.compare_hands()
In [ ]:
# quick sanity check of the game driver.
dealer = Dealer()
policy = Policy()
player = PolicyPlayer(policy)
game = Game(dealer, player, 1)
r = game.play()
print 'result: ', r
In [ ]:
from IPython import display
class Learner:
def __init__(self):
self.player = PolicyPlayer(Policy())
def play(self, num_of_training, debug_level):
dealer = Dealer()
mini_batch = 5 # We shuffle cards after 5 rounds of play.
progress_bar_iters = 100 # This is the number of times we "report progress", e.g. refreshing dynamic plots.
progress_bar_step_size = (num_of_training / mini_batch) / progress_bar_iters
old_decision = np.zeros((9,10))
old_q = np.zeros((9,10))
epoch = []
delta_q = []
fig = plt.figure(figsize=(12, 6))
j = 0
for i in xrange(num_of_training / mini_batch):
if i > 0 and progress_bar_iters > 0 and i % progress_bar_step_size == 0:
plt.clf()
if i < num_of_training / mini_batch - 1:
display.clear_output(wait=True)
epoch.append(j)
j += 1
new_q = self.player.policy.q[11:20,:,0,0]
delta_q.append(np.max(np.abs(new_q - old_q)))
old_q = np.copy(new_q)
plt.subplot(211)
plt.plot(epoch, delta_q, 'b-')
plt.xlim(0, progress_bar_iters)
plt.ylim(0, 1.1)
plt.subplot(223)
decision = (self.player.policy.pi[11:20,:,0,1] > self.player.policy.pi[11:20,:,0,0]).astype(int)
plt.imshow(np.flipud(decision), cmap='gray', vmin=0., vmax=1.)
plt.xlabel('Dealer Card')
plt.xticks(np.arange(10), np.arange(1,11,1).astype('S2'))
plt.ylabel('My Card')
plt.yticks(np.arange(9), np.arange(20,11,-1).astype('S2'))
ax = plt.gca()
ax.spines['right'].set_color('none')
ax.spines['top'].set_color('none')
plt.colorbar()
display.display(plt.gcf())
if debug_level > 0:
print '\n\n === Game {} === \n\n'.format(i+1)
game = Game(dealer, self.player, debug_level)
# Play `mini_batch` number of games without shuffling.
for i in xrange(mini_batch):
r = game.play()
self.player.update_policy(r)
self.player.reset_states()
In [ ]:
learner = Learner()
learner.play(5000000, 0)
In [ ]:
learner.player.policy.visualize()
In [ ]:
# Debug
i,j,k = 16,6,1
policy = learner.player.policy
print policy.stats[i,j,k,:,:]
print policy.q[i,j,k,:]
print policy.pi[i,j,k]
policy.plot_beta((i,j,k))
In [ ]: