der observation space besteht aus 1 Wert: self.observation_space = spaces.Discrete(1). Ich nehme an dieser soll die aktuelle position auf dem graphen darstellen (S,A,B,C). Hierfür würde ich spaces.Discrete(4) verwenden.
mit diesem observation space «weiss» der agent lediglich die position, ansonsten aber nichts ueber das restliche «spielfeld». Ich würde hier dem agent das ganze spielfeld «geben»
als policy wird hier die «default» «mlp» policy verwendet. Soweit ich sehen kann ist das ein fully-connected 2 layer nn mit 64 neuronen pro layer.
Ich habe mal versucht, das «mit dem ganzen spielfeld» im notebook durchzuspielen. Anbei meine aktualisierte version. Das Training scheint nach <=60k steps fertig zu sein und erreicht jeweils einen durchschnittlichen total reward von ~0.73. wenn ich das von hand rechne komme ich auf einen aehnlichen wert. schau’s dir doch mal an.
Aktuell sind die kosten auf den kanten des graphen fix. Interessant könnte sein, während jeder episode diese neu (random) zu wählen. Damit wäre der trainierte agent dann nach dem lernen vielleicht in der lage bei «beliebigen» Kosten jeweils eine gute lösung zu finden.
habe ich verschiedene policy architekturen durchprobiert. Das waren alles policies vom «mlp» typ mit 1-5 layern und 100-4500 neuronen pro layer. Am schluss habe ich dann diejenige genommen die am «besten» und am «einfachsten» war: 1 layer, 500 neuronen mit tanh als aktivierungs-funkction.
In [0]:
# !pip install -e git+https://github.com/openai/baselines#egg=berater
In [0]:
cnt=0
In [0]:
import numpy
import gym
from gym.utils import seeding
from gym import spaces
def state_name_to_int(state):
state_name_map = {
'S': 0,
'A': 1,
'B': 2,
'C': 3,
}
return state_name_map[state]
def int_to_state_name(state_as_int):
state_map = {
0: 'S',
1: 'A',
2: 'B',
3: 'C'
}
return state_map[state_as_int]
class BeraterEnv(gym.Env):
"""
The Berater Problem
Actions:
There are 3 discrete deterministic actions:
- 0: First Direction
- 1: Second Direction
- 2: Third Direction / Go home
"""
metadata = {'render.modes': ['ansi']}
num_envs = 1
showStep = False
showDone = True
showRender = False
envEpisodeModulo = 100
def __init__(self):
self.map = {
'S': [('A', 100), ('B', 400), ('C', 200 )],
'A': [('B', 250), ('C', 400), ('S', 100 )],
'B': [('A', 250), ('C', 250), ('S', 400 )],
'C': [('A', 400), ('B', 250), ('S', 200 )]
}
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(low=numpy.array([0,-1000,-1000,-1000,-1000,-1000,-1000]),
high=numpy.array([3,1000,1000,1000,1000,1000,1000]),
dtype=numpy.float32)
self.totalReward = 0
self.stepCount = 0
self.isDone = False
self.envReward = 0
self.envEpisodeCount = 0
self.envStepCount = 0
self.reset()
self.optimum = self.calculate_customers_reward()
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def step(self, actionArg):
paths = self.map[self.state]
action = actionArg
destination, cost = paths[action]
lastState = self.state
lastObState = state_name_to_int(lastState)
customerReward = self.customer_reward[destination]
info = {"from": self.state, "to": destination}
self.state = destination
reward = (-cost + self.customer_reward[destination]) / self.optimum
self.customer_visited(destination)
done = destination == 'S' and self.all_customers_visited()
stateAsInt = state_name_to_int(self.state)
self.totalReward += reward
self.stepCount += 1
self.envReward += reward
self.envStepCount += 1
if self.showStep:
print( "Episode: " + ("%4.0f " % self.envEpisodeCount) +
" Step: " + ("%4.0f " % self.stepCount) +
#lastState + ':' + str(lastObState) + ' --' + str(action) + '-> ' + self.state + ':' + str(stateAsInt) +
lastState + ' --' + str(action) + '-> ' + self.state +
' R=' + ("% 2.2f" % reward) + ' totalR=' + ("% 3.2f" % self.totalReward) +
' cost=' + ("%4.0f" % cost) + ' customerR=' + ("%4.0f" % customerReward) + ' optimum=' + ("%4.0f" % self.optimum)
)
if done and not self.isDone:
self.envEpisodeCount += 1
if BeraterEnv.showDone or (self.envEpisodeCount%BeraterEnv.envEpisodeModulo) == 0:
episodes = BeraterEnv.envEpisodeModulo
if (self.envEpisodeCount % BeraterEnv.envEpisodeModulo != 0):
episodes = self.envEpisodeCount % BeraterEnv.envEpisodeModulo
print( "Done: " +
("episodes=%6.0f " % self.envEpisodeCount) +
("avgSteps=%6.2f " % (self.envStepCount/episodes)) +
("avgTotalReward=% 3.2f" % (self.envReward/episodes) )
)
if (self.envEpisodeCount%BeraterEnv.envEpisodeModulo) == 0:
self.envReward = 0
self.envStepCount = 0
self.isDone = done
observation = self.getObservation(stateAsInt)
return observation, reward, done, info
def getObservation(self, position):
result = numpy.array([ position,
self.getEdgeObservation('S','A'),
self.getEdgeObservation('S','B'),
self.getEdgeObservation('S','C'),
self.getEdgeObservation('A','B'),
self.getEdgeObservation('A','C'),
self.getEdgeObservation('B','C'),
],
dtype=numpy.float32)
return result
def getEdgeObservation(self, source, target):
reward = self.customer_reward[target]
cost = self.getCost(source,target)
result = reward - cost
return result
def getCost(self, source, target):
paths = self.map[source]
targetIndex=state_name_to_int(target)
for destination, cost in paths:
if destination == target:
result = cost
break
return result
def customer_visited(self, customer):
self.customer_reward[customer] = 0
def all_customers_visited(self):
return self.calculate_customers_reward() == 0
def calculate_customers_reward(self):
sum = 0
for value in self.customer_reward.values():
sum += value
return sum
def reset(self):
# print("Reset")
self.totalReward = 0
self.stepCount = 0
self.isDone = False
reward_per_customer = 1000
self.customer_reward = {
'S': 0,
'A': reward_per_customer,
'B': reward_per_customer,
'C': reward_per_customer,
}
self.state = 'S'
return state_name_to_int(self.state)
def render(self, mode='human'):
if BeraterEnv.showRender:
print( ("steps=%4.0f " % self.stepCount) + ' totalR=' + ("% 3.2f" % self.totalReward) + ' done=' + str(self.isDone))
In [4]:
from gym.envs.registration import register
cnt += 1
id = "Berater-v{}".format(cnt)
register(
id=id,
entry_point=BeraterEnv
)
print(id)
In [5]:
BeraterEnv.showStep = True
BeraterEnv.showDone = True
env = gym.make(id)
observation = env.reset()
print(env)
for t in range(1000):
action = env.action_space.sample()
observation, reward, done, info = env.step(action)
if done:
env.render()
break
env.close()
In [6]:
from baselines.common.vec_env.dummy_vec_env import DummyVecEnv
from baselines.ppo2 import ppo2
BeraterEnv.showStep = False
BeraterEnv.showDone = False
wrapped_env = DummyVecEnv([lambda: gym.make(id)])
model = ppo2.learn(network='mlp', env=wrapped_env, total_timesteps=60000)
In [8]:
import numpy as np
observation = wrapped_env.reset()
state = np.zeros((1, 2*128))
dones = np.zeros((1))
BeraterEnv.showStep = True
BeraterEnv.showDone = False
for t in range(1000):
actions, _, state, _ = model.step(observation, S=state, M=dones)
observation, reward, done, info = wrapped_env.step(actions)
if done:
print("Episode finished after {} timesteps".format(t+1))
break
env.close()
In [0]: