In [ ]:
import gym
import pylab
import random
import numpy as np
from collections import deque
import tflearn
In [ ]:
class DeepQAgent:
def __init__(self, state_size, action_size, render=True):
# Tip: if you are training this on AWS the best way is to turn off rendering
# and load it later with the serialized model
self.render = render
self.state_size = state_size
self.action_size = action_size
self.discount_factor = 0.99
self.learning_rate = 0.001
self.epsilon = 1.0
self.epsilon_min = 0.005
self.epsilon_decay = (self.epsilon - self.epsilon_min) / 50000
self.batch_size = 64
self.train_start = 1000
# replay memory
self.memory = deque(maxlen=10000)
self.model = self.build_model()
self.target_model = self.build_model()
self.update_target_model()
def build_model(self):
# Use tflearn to get simple NN for deep q-learning
# Spoler alert: a couple of fully connected hidden layers should be enough
# Output layer should have the same dimensionality as the action space
# TODO
pass
def update_target_model(self):
"""Update your target model to the model you are currently learning at regular time intervals"""
self.target_model.set_weights(self.model.get_weights())
def get_action(self, state):
"""The choice of action uses the epsilon-greedy policy for the current network."""
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
else:
q_value = self.model.predict(state)
return np.argmax(q_value[0])
def replay_memory(self, state, action, reward, next_state, done):
"""Save <s, a, r, s'> to replay_memory"""
if action == 2:
action = 1
self.memory.append((state, action, reward, next_state, done))
if self.epsilon > self.epsilon_min:
self.epsilon -= self.epsilon_decay
# print(len(self.memory))
def train_replay(self):
"""Random sampling of batch_size samples from replay memory"""
if len(self.memory) < self.train_start:
return
batch_size = min(self.batch_size, len(self.memory))
mini_batch = random.sample(self.memory, batch_size)
update_input = np.zeros((batch_size, self.state_size))
update_target = np.zeros((batch_size, self.action_size))
for i in range(batch_size):
state, action, reward, next_state, done = mini_batch[i]
target = self.model.predict(state)[0]
# As in queuing, it gets the maximum Q Value at s'. However, it is imported from the target model.
if done:
target[action] = reward
else:
target[action] = reward + self.discount_factor * \
np.amax(self.target_model.predict(next_state)[0])
update_input[i] = state
update_target[i] = target
# You can create a minibatch of the correct target answer and the current value of your own,
self.model.fit(update_input, update_target, batch_size=batch_size, epochs=1, verbose=0)
def load_model(self, name):
# TODO
pass
def save_model(self, name):
# TODO
pass
In [ ]:
env = gym.make('MountainCar-v0')
state_size = env.observation_space.shape[0] # should be equal 2
ACTION_SIZE = 2
agent = DeepQAgent(state_size, ACTION_SIZE)
# agent.load_model("./save_model/<your_saved_model_name>")
scores, episodes = [], []
N_EPISODES = 4000
In [ ]:
for e in range(N_EPISODES):
done = False
score = 0
state = env.reset()
state = np.reshape(state, [1, state_size])
print(state)
# Action 0 (left), 1 (do nothing), 3 (declare fake_action to avoid doing nothing
fake_action = 0
# Counter for the same action 4 times
action_count = 0
while not done:
if agent.render:
env.render()
# Select an action in the current state and proceed to a step
action_count = action_count + 1
if action_count == 4:
action = agent.get_action(state)
action_count = 0
if action == 0:
fake_action = 0
elif action == 1:
fake_action = 2
# Take 1 step with the selected action
next_state, reward, done, info = env.step(fake_action)
next_state = np.reshape(next_state, [1, state_size])
# Give a penalty of -100 for actions that end an episode
# reward = reward if not done else -100
# Save <s, a, r, s'> to replay memory
agent.replay_memory(state, fake_action, reward, next_state, done)
# Continue to learn every time step
agent.train_replay()
score += reward
state = next_state
if done:
env.reset()
# Copy the learning model for each episode to the target model
agent.update_target_model()
# For each episode, the time step where cartpole stood is plot
scores.append(score)
episodes.append(e)
print("episode:", e, " score:", score, " memory length:", len(agent.memory),
" epsilon:", agent.epsilon)
# Save model for every 50 episodes
if e % 50 == 0:
agent.save_model("./save_model/<your_saved_model_name>")