In [ ]:
%matplotlib inline
In [ ]:
from windy import WindyGridWorld
import numpy as np
from collections import deque
import pickle
import matplotlib.pyplot as plt
In [ ]:
def epsilon_greedy_policy(q_vector, epsilon):
is_greedy_action = False if np.random.uniform() <= epsilon else True
if is_greedy_action:
#Random choice, if there are at least 2 Q-s with the same values
# TODO: Check, if randomizing the q_vector in lookup_action_value() around 0, what would happen
max_q_args = np.argwhere(q_vector == np.amax(q_vector))
if len(max_q_args) > 1:
action = np.random.choice(max_q_args.ravel(), 1)[0] + 1
else:
action = np.argmax(q_vector) + 1
else:
action = np.random.randint(1, 5)
return action
In [ ]:
def td_error(c_a_v, n_a_v, r, g=0.95):
return r + g * n_a_v - c_a_v
In [ ]:
def action_value(feature_vector, weights):
return feature_vector @ weights.T
In [ ]:
def get_action_value_vector(state, weights):
val = []
for act in range(1, 5):
val.append(get_action_value(state, weights, act))
return val
In [ ]:
def calc_distance(state_vector, winner_tile):
return np.linalg.norm(np.asarray(state_vector)-np.asarray(winner_tile))
In [ ]:
def feature_vector(state_vector, action):
return np.hstack((1 / (np.asarray(state_vector) + (np.asarray([-1, 0]), np.asarray([0, 1]), np.asarray([1, 0]), np.asarray([0, -1]))[action - 1] + np.array([1, 1])), 1 / np.asarray(state_vector)))
In [ ]:
def get_action_value(s, w, a):
return action_value(feature_vector(s, a), w)
In [ ]:
def td_lambda_update(curr_state, next_state, action, w, e_trace, reward, epsilon, _lambda=0.9, gamma=0.95, alpha=0.15):
# Get current action-value (belonging to current state and action)
current_action_value = get_action_value(curr_state, w, action)
# Get the next state action-value vector
next_action_value_vector = get_action_value_vector(next_state, w)
# Get the next action according to the next_action_value_vector and epsilon(Using epsilon-greedy policy with current epsilon)
next_action = epsilon_greedy_policy(next_action_value_vector, epsilon)
# Get next action-value from the next_action_value_vector according our action
next_action_value = next_action_value_vector[next_action - 1]
## UPDATE THE WEIGHTS ##
# Calculate the TD-error
delta = td_error(current_action_value, next_action_value, reward, gamma)
# Update eligibility trace
new_e_trace = gamma * _lambda * e_trace + feature_vector(curr_state, action)
# Weights delta
delta_w = alpha * delta * new_e_trace
return next_action, delta_w, new_e_trace
In [ ]:
def try_table(w):
game = WindyGridWorld(GRID_SIZE, WINNER_TILE, WINDY_ARRAY, START_TILE)
current_pos = game.current_pos()
is_ended = False
agent_positions = deque()
states_reward_list = []
actions_deque = deque()
epsilon = 0.01
while not is_ended:
# Append current agent position
agent_positions.append(current_pos)
# Lookup action value belonging to current state
action_value_vector = get_action_value_vector(current_pos, w)
# Choose action based on current action value vector and epsilon
action = epsilon_greedy_policy(action_value_vector, epsilon)
# Append action
actions_deque.append(action)
# Get next state, reward
current_pos, reward, is_ended = game.step(action)
return agent_positions, actions_deque, current_pos
In [ ]:
def show_moves(visited_states, grid_size):
arr = np.asarray(visited_states).T
range_x = (0.5, grid_size[1] + 0.5)
range_y = (0.5, grid_size[0] + 0.5)
ax = plt.gca()
ax.scatter(arr[1], arr[0])
ax.quiver(arr[1,:-1],arr[0,:-1],arr[1,1:]-arr[1,:-1],arr[0,1:]-arr[0,:-1], scale_units='xy', angles='xy', scale=1)
ax.set_xticks(np.arange(*range_x), minor=True)
ax.set_yticks(np.arange(*range_y), minor=True)
ax.set_xlim(*range_x)
ax.set_ylim(*range_y)
ax.set_xlabel("Valami")
ax.invert_yaxis()
ax.get_xaxis().set_tick_params(labeltop="on", labelbottom="off")
ax.set_aspect("equal")
plt.grid(which="minor")
plt.show()
In [ ]:
GRID_SIZE = (6, 6)
WINNER_TILE = (3, 6)
# WINDY_ARRAY = (0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 0)
WINDY_ARRAY = np.zeros(6)
START_TILE = (1, 1)
In [ ]:
feature_vector_size = 4
W = np.zeros(feature_vector_size)
epsilon = 1
episode_number = 0
average = 0
alpha = 0.15
episodes_number=200
#Külső ciklus
for curr_episode in range(episodes_number):
print("-", end="")
episode_number += 1
alpha = alpha - alpha/episodes_number* curr_episode
if episode_number % 1000 == 0:
print(str(episode_number) + ". játék")
print("Lépések száma: " + str(average / 1000))
average = 0
epsilon = 1 / (episode_number**(1/3))
# Containers
agent_positions = deque()
states_reward_list = []
actions_deque = deque()
# End bool
is_ended = False
# Reset eligibility traces to zero
ELIGIBILITY_TRACE = np.zeros(feature_vector_size)
# New game
game = WindyGridWorld(GRID_SIZE, WINNER_TILE, WINDY_ARRAY, START_TILE)
current_pos = game.current_pos()
# TODO: check if current_pos type (tuple) is a problem or not
# Get action value belonging to current state (through feature vectors)
action_value_vector = get_action_value_vector(current_pos, W)
# Choose action based on current action value vector and epsilon
action = epsilon_greedy_policy(action_value_vector, epsilon)
#Belső ciklus
while not is_ended:
#for _ in range(15):
# Append current agent position
agent_positions.append(current_pos)
# Append action
actions_deque.append(action)
# Get next state, reward
current_pos, reward, is_ended = game.step(action)
# Append reward
states_reward_list.append(reward)
# TD lambda update
action, delta_W, ELIGIBILITY_TRACE = td_lambda_update(agent_positions[-1], current_pos, action, W, ELIGIBILITY_TRACE, reward, epsilon)
W += delta_W
# TODO: Something is wrong with this, dunno what yet
vertical_distance = WINNER_TILE[0] - agent_positions[0][0]
horizontal_distance = WINNER_TILE[1] - agent_positions[0][1]
optimal = max((horizontal_distance, vertical_distance))
average += len(actions_deque) / optimal
In [ ]:
get_action_value_vector((2, 6), W)
In [ ]:
a, b, c = try_table(W)
a.append(c)
show_moves(a, GRID_SIZE)
In [ ]:
count = 0
for _ in range(4000):
a,b,c = try_table()
count += len(b) / 19
count / 4000