spaces.Discrete(1)
spaces.Box
In [1]:
!pip install -e git+https://github.com/openai/baselines#egg=berater
In [0]:
cnt=0
In [0]:
import numpy
import gym
from gym.utils import seeding
from gym import spaces
def state_name_to_int(state):
state_name_map = {
'S': 0,
'A': 1,
'B': 2,
'C': 3,
}
return state_name_map[state]
def int_to_state_name(state_as_int):
state_map = {
0: 'S',
1: 'A',
2: 'B',
3: 'C'
}
return state_map[state_as_int]
class BeraterEnv(gym.Env):
"""
The Berater Problem
Actions:
There are 3 discrete deterministic actions:
- 0: First Direction
- 1: Second Direction
- 2: Third Direction / Go home
"""
metadata = {'render.modes': ['ansi']}
num_envs = 1
showStep = False
showDone = True
showRender = False
envEpisodeModulo = 100
def __init__(self):
self.map = {
'S': [('A', 100), ('B', 400), ('C', 200 )],
'A': [('B', 250), ('C', 400), ('S', 100 )],
'B': [('A', 250), ('C', 250), ('S', 400 )],
'C': [('A', 400), ('B', 250), ('S', 200 )]
}
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(low=numpy.array([0,-1000,-1000,-1000,-1000,-1000,-1000]),
high=numpy.array([3,1000,1000,1000,1000,1000,1000]),
dtype=numpy.float32)
self.totalReward = 0
self.stepCount = 0
self.isDone = False
self.envReward = 0
self.envEpisodeCount = 0
self.envStepCount = 0
self.reset()
self.optimum = self.calculate_customers_reward()
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def step(self, actionArg):
paths = self.map[self.state]
action = actionArg
destination, cost = paths[action]
lastState = self.state
lastObState = state_name_to_int(lastState)
customerReward = self.customer_reward[destination]
info = {"from": self.state, "to": destination}
self.state = destination
reward = (-cost + self.customer_reward[destination]) / self.optimum
self.customer_visited(destination)
done = destination == 'S' and self.all_customers_visited()
stateAsInt = state_name_to_int(self.state)
self.totalReward += reward
self.stepCount += 1
self.envReward += reward
self.envStepCount += 1
if self.showStep:
print( "Episode: " + ("%4.0f " % self.envEpisodeCount) +
" Step: " + ("%4.0f " % self.stepCount) +
#lastState + ':' + str(lastObState) + ' --' + str(action) + '-> ' + self.state + ':' + str(stateAsInt) +
lastState + ' --' + str(action) + '-> ' + self.state +
' R=' + ("% 2.2f" % reward) + ' totalR=' + ("% 3.2f" % self.totalReward) +
' cost=' + ("%4.0f" % cost) + ' customerR=' + ("%4.0f" % customerReward) + ' optimum=' + ("%4.0f" % self.optimum)
)
if done and not self.isDone:
self.envEpisodeCount += 1
if BeraterEnv.showDone or (self.envEpisodeCount%BeraterEnv.envEpisodeModulo) == 0:
episodes = BeraterEnv.envEpisodeModulo
if (self.envEpisodeCount % BeraterEnv.envEpisodeModulo != 0):
episodes = self.envEpisodeCount % BeraterEnv.envEpisodeModulo
print( "Done: " +
("episodes=%6.0f " % self.envEpisodeCount) +
("avgSteps=%6.2f " % (self.envStepCount/episodes)) +
("avgTotalReward=% 3.2f" % (self.envReward/episodes) )
)
if (self.envEpisodeCount%BeraterEnv.envEpisodeModulo) == 0:
self.envReward = 0
self.envStepCount = 0
self.isDone = done
observation = self.getObservation(stateAsInt)
return observation, reward, done, info
def getObservation(self, position):
result = numpy.array([ position,
self.getEdgeObservation('S','A'),
self.getEdgeObservation('S','B'),
self.getEdgeObservation('S','C'),
self.getEdgeObservation('A','B'),
self.getEdgeObservation('A','C'),
self.getEdgeObservation('B','C'),
],
dtype=numpy.float32)
return result
def getEdgeObservation(self, source, target):
reward = self.customer_reward[target]
cost = self.getCost(source,target)
result = reward - cost
return result
def getCost(self, source, target):
paths = self.map[source]
targetIndex=state_name_to_int(target)
for destination, cost in paths:
if destination == target:
result = cost
break
return result
def customer_visited(self, customer):
self.customer_reward[customer] = 0
def all_customers_visited(self):
return self.calculate_customers_reward() == 0
def calculate_customers_reward(self):
sum = 0
for value in self.customer_reward.values():
sum += value
return sum
def reset(self):
# print("Reset")
self.totalReward = 0
self.stepCount = 0
self.isDone = False
reward_per_customer = 1000
self.customer_reward = {
'S': 0,
'A': reward_per_customer,
'B': reward_per_customer,
'C': reward_per_customer,
}
self.state = 'S'
return state_name_to_int(self.state)
def render(self, mode='human'):
if BeraterEnv.showRender:
print( ("steps=%4.0f " % self.stepCount) + ' totalR=' + ("% 3.2f" % self.totalReward) + ' done=' + str(self.isDone))
In [3]:
from gym.envs.registration import register
cnt += 1
id = "Berater-v{}".format(cnt)
register(
id=id,
entry_point=BeraterEnv
)
print(id)
In [4]:
BeraterEnv.showStep = True
BeraterEnv.showDone = True
env = gym.make(id)
observation = env.reset()
print(env)
for t in range(1000):
action = env.action_space.sample()
observation, reward, done, info = env.step(action)
if done:
env.render()
break
env.close()
In [5]:
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.ppo2 import ppo2
BeraterEnv.showStep = False
BeraterEnv.showDone = False
wrapped_env = DummyVecEnv([lambda: gym.make(id)])
model = ppo2.learn(network='mlp', env=wrapped_env, total_timesteps=60000)
In [6]:
import numpy as np
observation = wrapped_env.reset()
state = np.zeros((1, 2*128))
dones = np.zeros((1))
BeraterEnv.showStep = True
BeraterEnv.showDone = False
for t in range(1000):
actions, _, state, _ = model.step(observation, S=state, M=dones)
observation, reward, done, info = wrapped_env.step(actions)
if done:
print("Episode finished after {} timesteps".format(t+1))
break
env.close()
In [0]: