In [1]:
# !pip install -e git+https://github.com/openai/baselines#egg=berater
In [0]:
cnt = 0
In [0]:
import gym
from gym.utils import seeding
from gym import spaces
def state_name_to_int(state):
state_name_map = {
'S/Z': 0,
'A': 1,
'B': 2,
'C': 3,
}
return state_name_map[state]
def int_to_state_name(state_as_int):
state_map = {
0: 'S/Z',
1: 'A',
2: 'B',
3: 'C'
}
return state_map[state_as_int]
class BeraterEnv(gym.Env):
"""
The Berater Problem
Actions:
There are 3 discrete deterministic actions:
- 0: First Direction
- 1: Second Direction
- 2: Third Direction / Go home
"""
metadata = {'render.modes': ['ansi']}
num_envs = 1
def __init__(self):
self.map = {
'S/Z': [('A', 100), ('B', 400), ('C', 200 )],
'A': [('B', 250), ('C', 400), ('S/Z', 100 )],
'B': [('A', 250), ('C', 250), ('S/Z', 400 )],
'C': [('A', 400), ('B', 250), ('S/Z', 200 )]
}
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Discrete(1)
self.reset()
self.optimum = self.calculate_customers_reward()
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def step(self, action):
paths = self.map[self.state]
destination, cost = paths[action]
info = {"from": self.state, "to": destination}
self.state = destination
reward = (-cost + self.customer_reward[destination]) / self.optimum
self.customer_visited(destination)
done = destination == 'S/Z' and self.all_customers_visited()
return state_name_to_int(self.state), reward, done, info
def customer_visited(self, customer):
self.customer_reward[customer] = 0
def all_customers_visited(self):
return self.calculate_customers_reward() == 0
def calculate_customers_reward(self):
sum = 0
for value in self.customer_reward.values():
sum += value
return sum
def reset(self):
# print("Reset")
reward_per_customer = 1000
self.customer_reward = {
'S/Z': 0,
'A': reward_per_customer,
'B': reward_per_customer,
'C': reward_per_customer,
}
self.state = 'S/Z'
return state_name_to_int(self.state)
def render(self, mode='human'):
print(self.state)
print(self.customer_reward)
from gym.envs.registration import register
cnt += 1
id = "Berater-v{}".format(cnt)
register(
id=id,
entry_point=BeraterEnv
)
env = gym.make(id)
In [3]:
# Run a demo of the environment
observation = env.reset()
for t in range(1000):
env.render()
action = env.action_space.sample()
print("Action: {}".format(action))
observation, reward, done, info = env.step(action)
print(reward)
if done:
print("Episode finished after {} timesteps".format(t+1))
break
env.close()
In [4]:
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.ppo2 import ppo2
wrapped_env = DummyVecEnv([lambda: gym.make(id)])
model = ppo2.learn(network='mlp', env=wrapped_env, total_timesteps=20000)
In [5]:
observation = wrapped_env.reset()
import numpy as np
state = np.zeros((1, 2*128))
dones = np.zeros((1))
for t in range(1000):
wrapped_env.render()
actions, _, state, _ = model.step(observation, S=state, M=dones)
print("Action: {}".format(actions))
observation, reward, done, info = wrapped_env.step(actions)
print(reward)
if done:
print("Episode finished after {} timesteps".format(t+1))
break
wrapped_env.close()
In [0]: