In [1]:
"""
A simple example for Reinforcement Learning using table lookup Q-learning method.
An agent "o" is on the left of a 1 dimensional world, the treasure is on the rightmost location.
Run this program and to see how the agent will improve its strategy of finding the treasure.
View more on my tutorial page: https://morvanzhou.github.io/tutorials/
"""
import numpy as np
import pandas as pd
import time

"""
给随机生成器设置seed的目的是每次运行程序得到的随机数的值相同,这样方便测试。
numpy.random.seed()不是线程安全的,如果程序中有多个线程最好使用
numpy.random.RandomState实例对象来创建或者使用random.seed()来设置相同的随机数种子。
"""
np.random.seed(2)  # reproducible

In [2]:
N_STATES = 11  # the length of the 1 dimensional world
ACTIONS = ['left', 'right']     # available actions
EPSILON = 0.9   # greedy police 贪心策略
ALPHA = 0.1     # learning rate
GAMMA = 0.9    # discount factor
MAX_EPISODES = 30   # maximum episodes 最大的回合数
FRESH_TIME = 0.3    # fresh time for one move

In [3]:
def Q_table_initialization(num_states, actions):
    """
    Q_Table 初始化
    基础版本 行-状态 列-行为
    """
    Q_Table = pd.DataFrame(
        np.zeros((num_states, len(actions))), # 初始化Q 表格 行-状态 列-行为
        columns = actions) # 列名-行为
    return Q_Table

In [4]:
# test for Q_Table_Initialization

test = Q_table_initialization(N_STATES, ACTIONS)
test


Out[4]:
left right
0 0.0 0.0
1 0.0 0.0
2 0.0 0.0
3 0.0 0.0
4 0.0 0.0
5 0.0 0.0
6 0.0 0.0
7 0.0 0.0
8 0.0 0.0
9 0.0 0.0
10 0.0 0.0

In [5]:
def action_choice(state, Q_Table):
    """
    在状态state_i 选择行为action
    """
    # 获取状态所有行为
    state_actions = Q_Table.iloc[state, :] 
    # act non-greedy or state-action have no value
    if (np.random.uniform() > EPSILON) or (state_actions.all()==0):
        action_name = np.random.choice(ACTIONS)
    else:
        # 获取Q-Tables 最大价值奖励的行为
        action_name = state_actions.argmax()
    return action_name

In [6]:
# test for choose_action
test_2 = action_choice(state=3, Q_Table=test)
test_2


Out[6]:
'right'

In [7]:
# test for choose_action
test = pd.DataFrame(np.random.uniform(size=(6,2)), columns=ACTIONS)
action_choice(2,test)


Out[7]:
'right'

In [8]:
def get_environment_feedback(state, action):
    """
    这是agent如何与环境的互动
    This is how agent will interact with the environment
    """
    if action == 'right':
        if state == N_STATES - 2: # terminate 
            next_state = 'terminal' #终止
            reward = 1
        else:
            next_state = state + 1
            reward = 0

    elif action == 'left':    
        reward = 0
        if state == 0:
            next_state = state
        else:
            next_state = state - 1
    return next_state, reward

In [9]:
#test for get_environment_feedback
next_state, reward = get_environment_feedback(3, 'right')
print (next_state)
print (reward)


4
0

In [10]:
def update_environment(state, episode, step_counter):
    """
    agent 如何与环境 environment 交互
    This is how environment be updated
    """ 
    # 环境表现-列表形式
    environment_list = ['-']*(N_STATES-1) + ['T']
    
    if state == 'terminal': # 终止
        interaction = 'episode %s: total_steps = %s' % (episode+1, step_counter)
        print ('\r{}'.format(interaction), end='')
        # Python time sleep() 函数推迟调用线程的运行,可通过参数secs指秒数,表示进程挂起的时间。
        # t -- 推迟执行的秒数。
        time.sleep(5)
        # \r 代表回车,也就是打印头归位,回到某一行的开头。
        # \n代表换行,就是走纸,下一行。
        # \r真正实现了其回车的功能(回到某行开头,把前面的输出覆盖了,
        # 其实在PyCharm中是把前面的内容抹去了,不管前面的内容有多长都会被全部抹去)
        print('\r                                ', end='')
        
    else:
        environment_list[state] = 'o'
        # Python join() 方法用于将序列中的元素以指定的字符连接生成一个新的字符串。
        interaction = ''.join(environment_list)
        print ('\r{}'.format(interaction), end='')
        time.sleep(FRESH_TIME)

In [11]:
# test for update_environment
state = 'terminal'
episode = 9
step_counter = 11
update_environment(state, episode, step_counter)


                                

In [12]:
# test for update_environment
state = 2
episode = 9
step_counter = 11
update_environment(state, episode, step_counter)


--o-------T

In [13]:
def reinforcement_learning():
    # Q Learning 表格初始化
    q_table = Q_table_initialization(N_STATES, ACTIONS)
    # episode 一局游戏的一个回合
    for episode in range(MAX_EPISODES):
        step_counter = 0 # 计数器初始化
        state = 0 # 回合初始化
        is_terminated = False # 是否回合结束
        update_environment(state, episode, step_counter)
        while not is_terminated:
            action = action_choice(state, q_table) # 行为选择
            next_state, reward = get_environment_feedback(state, action) # agent 行为与环境交互
            q_values_eval = q_table.loc[state, action]
            if next_state != 'terminal':
                q_targets = reward + GAMMA * q_table.iloc[next_state, :].max()
            else:
                q_targets = reward
                is_terminated = True
            
            q_table.loc[state, action] += ALPHA * (q_targets - q_values_eval) # q_table 更新
            state = next_state # 探索者移动到下一个 state
            step_counter += 1
            update_environment(state, episode, step_counter) # 环境更新
            
    return q_table

In [14]:
q_table = reinforcement_learning()


                                

In [15]:
q_table


Out[15]:
left right
0 1.652261e-09 0.000254
1 5.941277e-10 0.001134
2 1.553630e-05 0.004548
3 2.952575e-09 0.016046
4 4.860350e-05 0.047977
5 2.944280e-04 0.120605
6 6.059347e-03 0.258852
7 2.368521e-05 0.478073
8 1.385100e-04 0.735098
9 8.100000e-04 0.957609
10 0.000000e+00 0.000000