In [ ]:
import gym
import pylab
import random
import numpy as np
from collections import deque
import tflearn

Task: fill empty spaces in the following agent code


In [ ]:
class DeepQAgent:
    def __init__(self, state_size, action_size, render=True):
        # Tip: if you are training this on AWS the best way is to turn off rendering
        # and load it later with the serialized model
        self.render = render
        self.state_size = state_size
        self.action_size = action_size

        self.discount_factor = 0.99
        self.learning_rate = 0.001
        self.epsilon = 1.0
        self.epsilon_min = 0.005
        self.epsilon_decay = (self.epsilon - self.epsilon_min) / 50000
        self.batch_size = 64
        self.train_start = 1000
        # replay memory
        self.memory = deque(maxlen=10000)

        self.model = self.build_model()
        self.target_model = self.build_model()
        self.update_target_model()

    def build_model(self):
        # Use tflearn to get simple NN for deep q-learning
        # Spoler alert: a couple of fully connected hidden layers should be enough
        # Output layer should have the same dimensionality as the action space
        # TODO
        pass

    def update_target_model(self):
        """Update your target model to the model you are currently learning at regular time intervals"""
        self.target_model.set_weights(self.model.get_weights())

    def get_action(self, state):
        """The choice of action uses the epsilon-greedy policy for the current network."""
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        else:
            q_value = self.model.predict(state)
            return np.argmax(q_value[0])

    def replay_memory(self, state, action, reward, next_state, done):
        """Save <s, a, r, s'> to replay_memory"""
        if action == 2:
            action = 1
        self.memory.append((state, action, reward, next_state, done))
        if self.epsilon > self.epsilon_min:
            self.epsilon -= self.epsilon_decay
            # print(len(self.memory))

    def train_replay(self):
        """Random sampling of batch_size samples from replay memory"""
        if len(self.memory) < self.train_start:
            return
        batch_size = min(self.batch_size, len(self.memory))
        mini_batch = random.sample(self.memory, batch_size)

        update_input = np.zeros((batch_size, self.state_size))
        update_target = np.zeros((batch_size, self.action_size))

        for i in range(batch_size):
            state, action, reward, next_state, done = mini_batch[i]
            target = self.model.predict(state)[0]

            # As in queuing, it gets the maximum Q Value at s'. However, it is imported from the target model.
            if done:
                target[action] = reward
            else:
                target[action] = reward + self.discount_factor * \
                                          np.amax(self.target_model.predict(next_state)[0])
            update_input[i] = state
            update_target[i] = target

        # You can create a minibatch of the correct target answer and the current value of your own,
        self.model.fit(update_input, update_target, batch_size=batch_size, epochs=1, verbose=0)

    def load_model(self, name):
        # TODO
        pass

    def save_model(self, name):
        # TODO
        pass

In [ ]:
env = gym.make('MountainCar-v0')
state_size = env.observation_space.shape[0] # should be equal 2
ACTION_SIZE = 2
agent = DeepQAgent(state_size, ACTION_SIZE)
# agent.load_model("./save_model/<your_saved_model_name>")
scores, episodes = [], []
N_EPISODES = 4000

In [ ]:
for e in range(N_EPISODES):
    done = False
    score = 0
    state = env.reset()
    state = np.reshape(state, [1, state_size])
    print(state)

    # Action 0 (left), 1 (do nothing), 3 (declare fake_action to avoid doing nothing
    fake_action = 0

    # Counter for the same action 4 times
    action_count = 0

    while not done:
        if agent.render:
            env.render()

        # Select an action in the current state and proceed to a step
        action_count = action_count + 1

        if action_count == 4:
            action = agent.get_action(state)
            action_count = 0

            if action == 0:
                fake_action = 0
            elif action == 1:
                fake_action = 2

        # Take 1 step with the selected action
        next_state, reward, done, info = env.step(fake_action)
        next_state = np.reshape(next_state, [1, state_size])
        # Give a penalty of -100 for actions that end an episode
        # reward = reward if not done else -100

        # Save <s, a, r, s'> to replay memory
        agent.replay_memory(state, fake_action, reward, next_state, done)
        # Continue to learn every time step
        agent.train_replay()
        score += reward
        state = next_state

        if done:
            env.reset()
            # Copy the learning model for each episode to the target model
            agent.update_target_model()

            # For each episode, the time step where cartpole stood is plot
            scores.append(score)
            episodes.append(e)
            print("episode:", e, "  score:", score, "  memory length:", len(agent.memory),
                  "  epsilon:", agent.epsilon)

    # Save model for every 50 episodes
    if e % 50 == 0:
        agent.save_model("./save_model/<your_saved_model_name>")