In [1]:
# Sutton's book Chapter 5: Monte Carlo with Exploring Start
# Blackjack as in https://www.youtube.com/watch?v=SWdPf21v5Ak
import random
import numpy as np
from scipy.stats import beta
import matplotlib.pyplot as plt
DECK_OF_CARDS = [1,2,3,4,5,6,7,8,9,10,10,10,10] * 4 #J, Q, K count as 10.
DEALDER_STICK_THRESHOLD = 17 # same as Sutton's and Vega's rule
def sum_hand(hand):
min_s = sum(hand)
max_s = sum(hand)
if 1 in hand:
max_s = min_s + 10
if max_s <= 21 and max_s > min_s:
## Has a usable ACE.
return (max_s, 1)
else:
return (min_s, 0)
def busted(hand):
(s, _) = sum_hand(hand)
return s > 21
def convert_cards_to_state(my_cards, dealer_card):
(s, usable_ace) = sum_hand(my_cards)
# state is 0-based, for easy indexing with ndarray.
return (s-1, dealer_card-1, usable_ace)
In [2]:
class BasePlayer(object):
# Return 1 if hit, 0 if stick.
def action(self, cards, rival_card):
raise NotImplementedError("action() is not implemented.")
class Dealer(BasePlayer):
def __init__(self):
self.stick_threshold = DEALDER_STICK_THRESHOLD
def action(self, cards):
(s, _) = sum_hand(cards) #s>=1
return int(self.stick_threshold > s) #stick for 17 or greater, same as Sutton's
In [3]:
class Policy(object):
def __init__(self):
# stats is a 5D tensor.
# The first 3 dimensions describe state. The 4th dimension describes
# action (0 for stick, 1 for hit). The 5th dimension describes the
# reward: {-1, 0, 1}.
# self.states[i][j][k][a][r] saves counts of how many times we have
# reward `r` when we take action `a` for state s = (i, j, k) (s is 0-based).
self.stats = np.zeros((21, 10, 2, 2, 3), dtype='int64')
# q is a 4D tensor that represents q(s, a).
self.q = np.zeros((21, 10, 2, 2), dtype='float64')
# pi is a 4D tensor that represents pi(a|s), last dimension is the action
self.pi = np.zeros((21, 10, 2, 2), dtype='float64')
#pi[:,:,:,1] is the probability of hit while pi[:,:,:,0] is the probability of stick;use deterministic action here
self.pi[0:11,:,:,:] = [0., 1.] #Always hit if the cards sum to [1,11]; note that this part of pi should never get updated
self.pi[11:19,:,:,:]=[0.,1.] #same as Sutton's initial policy, hit if [12,19]; stick if 20,21
self.pi[19:,:,:,:] = [1., 0.]
def update(self, state_action_seq, final_reward):
#update q(s,a) snd pi(a|s) using the episode(ie., state_action_seq) and the reward
#For blackjack, the states in the state_action_seq are all different; first visit MC and every visit MC are the same
for state_action in state_action_seq:
(i,j,k,a) = state_action #i=sum(play_cards), 0<=i<=20, 0<=j<=9, k=0 or 1,
# reward is in {-1, 0, 1}
self.stats[i,j,k,a,final_reward + 1] += 1
# q stores the return estimate E[R|(s,a)] so far.
self.q[i,j,k,a] = (self.stats[i,j,k,a,2] - self.stats[i,j,k,a,0]) * 1.0 / np.sum(self.stats[i,j,k,a,:])
#greedy update pi(a|s) based on estimated q(s,a)
if i <= 10:
# No update: we always hit.
continue
if i == 20:
# No update: we always stick.
continue
# If q[i,i,k,0] > q[i,j,k,1], argsort returns [1,0].
self.pi[i,j,k,:] = np.argsort(self.q[i,j,k,:])
def action(self, state):#greedy
return np.argmax(self.pi[state])
# Visualize matrix as a grayscale image, assuming entries are in [0,1].
def imshow(self, ax, matrix, title):
ax.set_title(title)
ax.imshow(matrix, cmap='gray',vmin=0., vmax=1., interpolation="none")
ax.set_xlabel('Dealer Card')
ax.set_xticks(np.arange(10))
ax.set_xticklabels(np.arange(1,11,1).astype('S2'))
ax.set_ylabel('Player Card')
ax.set_yticks(np.arange(11))
ax.set_yticklabels(np.arange(21,10,-1).astype('S2'))
ax.spines['right'].set_color('none')
ax.spines['top'].set_color('none')
# Visualize the hard decision matrix
def visualize(self):
fig = plt.figure(figsize=(12, 9), dpi=80)
usable_ace = 0
ax = plt.subplot(221)
decision = (self.pi[10:,:,usable_ace,1] > self.pi[10:,:,usable_ace,0]).astype(int)
self.imshow(ax, np.flipud(decision), 'Decision (No usable Ace)')
usable_ace = 1
ax = plt.subplot(223)
decision = (self.pi[10:,:,usable_ace,1] > self.pi[10:,:,usable_ace,0]).astype(int)
self.imshow(ax, np.flipud(decision), 'Decision (with usable Ace)')
plt.show()
In [4]:
class PolicyPlayer(BasePlayer):
def __init__(self, policy):
self.policy = policy
self.state_action_pairs = []
def action(self, cards, rival_card,a=None):
s = convert_cards_to_state(cards, rival_card) #s is 0-based
if a is None:
a = self.policy.action(s)
(i,j,k) = s #
self.state_action_pairs.append((i,j,k,a))
return a
def reset_states(self):
self.state_action_pairs = []
In [5]:
# A simple debugger (to avoid clutter in main code.)
class Dbg(object):
# level can be 0 or 1. (0 means silent.)
def __init__(self, level):
self.level = level
def print_hands(self, game):
if self.level == 0:
return
print 'dealer cards: ', game.dealer_cards
print 'player cards: ', game.player_cards
def print_hand(self,cards):
if self.level == 0:
return
print 'cards: ', cards
def on_dealer_action(self, action):
if self.level == 0:
return
print 'dealer action: ', action
def on_player_action(self, action):
if self.level == 0:
return
print 'player action: ', action
def print_bust_status(self, dealer, player):
if self.level == 0:
return
if player and dealer:
print 'both busted'
elif player:
print 'player busted'
elif dealer:
print 'dealer busted'
else:
pass
def print_sum_of_hands(self, dealer_sum, player_sum):
if self.level == 0:
return
print 'dealer sum: {}, player sum: {}'.format(dealer_sum, player_sum)
In [6]:
class Game(object):
"""
Blackjack shoe game as in https://www.youtube.com/watch?v=SWdPf21v5Ak
"""
def __init__(self, dealer, player, debug_level=0):
self.dealer = dealer
self.player = player
self.cards = DECK_OF_CARDS[:]
#random.shuffle(self.cards) #Sutton assumes infinite cards. Use sampling with replacement instead.
self.dbg = Dbg(debug_level)
def on_player_action(self, action):
if action == 1:
#Sutton assumes infinite cards.
self.player_cards.extend(np.random.choice(self.cards,size=1))
self.dbg.on_player_action(action)
def on_dealer_action(self, action):
if action == 1:
self.dealer_cards.extend(np.random.choice(self.cards,size=1))
self.dbg.on_dealer_action(action)
def play(self,dealer_init_cards, player_init_cards, a):
"""
each play generates an episode starting from state the initial states and action
dealer_init_cards, player_init_cards: List[Int] of size 2; must be non-busted state;
a: player first action, 1 for hit ; 0 for stick,
return: 1 if the player wins, -1 if the player loses, 0 for draw
"""
self.player.reset_states() #reset the player's memory of state_action pair
self.dealer_cards = list(dealer_init_cards)
self.player_cards = list(player_init_cards)
assert not busted(dealer_init_cards) and not busted(player_init_cards), "Error: Invalid initial state!"
#player's turn
self.player.action(self.player_cards,self.dealer_cards[0],a)
while a:
self.on_player_action(a) # update the states
if busted(self.player_cards):
return -1
a = self.player.action(self.player_cards, self.dealer_cards[0])
self.on_player_action(a)
#dealer's turn
a = self.dealer.action(self.dealer_cards)
while a:
self.on_dealer_action(a) # update the states
if busted(self.dealer_cards):
return 1
a = self.dealer.action(self.dealer_cards) #dealer sticks to the same rule regardless of the player's cards
#both dealer and player stick
d_sum, _ = sum_hand(self.dealer_cards)
p_sum, _ = sum_hand(self.player_cards)
return np.sign(p_sum - d_sum)
In [7]:
class Learner:
def __init__(self):
self.player_policy = Policy()
self.player = PolicyPlayer(self.player_policy)
self.dealer = Dealer()
self.game = Game(self.dealer,self.player)
def train(self):
"""
learn the optimal policy using MC with exploring starts
return: optimal policy pi(a|s)
"""
np.random.seed(4)
max_iter = int(1e6) #reduce max_iter to 1e6 generates correct result except for the case player=12, dealer=4, use_able=0
tol = 1e-2
max_epoch =100
for i_epoch in range(max_epoch):
q_old = np.array(self.player_policy.q)
for i in range(max_iter):
dealer_init_cards = np.random.choice(self.game.cards,size=2,replace=True)
player_init_cards = np.random.choice(self.game.cards,size=2, replace=True)
for a in [0,1]:
rt = self.game.play(dealer_init_cards, player_init_cards,a)
self.player_policy.update(self.player.state_action_pairs,rt)
q_delta = np.max(np.abs(q_old-self.player_policy.q))
print "epoch=%d, q_delta=%.6f" % (i_epoch,q_delta)
if q_delta<tol:
break
else:
print "Warning: not converge for max_epoch(%d), tol=%.4f" %(max_epoch,tol)
return self.player_policy
In [8]:
learner = Learner()
optimal_policy = learner.train()
In [9]:
optimal_policy.pi.shape
Out[9]:
In [10]:
optimal_policy.pi[11:,:,0,1] #usable=0
Out[10]:
In [11]:
optimal_policy.pi[11:,:,1,1] #usable=1
Out[11]:
In [240]:
#export the result generated maxiter=1e8 as the true solution
#np.save("optimal_policy_pi.npy",optimal_policy.pi)
#np.save("optimal_policy_q.npy",optimal_policy.q)
#np.save("optimal_policy_stats.npy",optimal_policy.stats)
In [12]:
#player_cards=12, dealer up cards =3, useable=False is a difficult case due to the probability are very closed
i,j,k = 11,3,0
optimal_policy.pi[i,j,k,0], optimal_policy.pi[i,j,k,1], optimal_policy.q[i,j,k]
Out[12]:
In [13]:
optimal_policy.v = optimal_policy.q.max(axis=3)
optimal_policy.v.shape
Out[13]:
In [14]:
optimal_policy.v[:,:,0] #usable=False, the first three rows are 0 because these states won't occur.
Out[14]:
In [15]:
optimal_policy.v[:,:,1] #usable=True, the first 11 rows are 0 because these states won't occur.
Out[15]:
In [16]:
optimal_policy.visualize()
In [17]:
#Use MC to compute prob(i,j,k)
prob = np.zeros((21,10,2))
np.random.seed(4)
max_iter = int(1e5) #reduce max_iter to 1e6 generates correct result except for the case player=12, dealer=4, use_able=0
for _ in range(max_iter):
dealer_init_cards = np.random.choice(learner.game.cards,size=2,replace=True)
player_init_cards = np.random.choice(learner.game.cards,size=2, replace=True)
i,j,k = convert_cards_to_state(player_init_cards, dealer_init_cards[0])
prob[i,j,k] = prob[i,j,k] + 1.0
prob = prob/max_iter
In [18]:
prob.sum()
Out[18]:
In [19]:
#E[Reture]= sum_{i,j,k} Prob[i,j,k] * E[Reture|i,j,k], E[Return|i,j,k]= optimal_policy.v
print "House edge: ", np.sum(prob*optimal_policy.v)
In [ ]: