In [ ]:
import random
import numpy as np
import tensorflow as tf
from collections import deque
In [ ]:
class DQNCore(object):
def __init__(self, observation, num_actions, learning_rate=1e-3, memory_size=1024, batch_size=32, gamma=.9, name='DNCore'):
self.num_actions = num_actions
self.memory_size = memory_size
self.gamma = gamma # discount factor for excepted returns
self.batch_size = 32
# placeholder for samples replay experience
shape = [None] + list(observation.shape [1:])
self.inputs = tf.placeholder(tf.float32, shape, 'inputs')
self.targets = tf.placeholder(tf.float32, [None], 'targets') # y_j
self.actions = tf.placeholder(tf.int32, [None], 'actions')
self.rewards = tf.placeholder(tf.float32, [None], 'rewards')
self.Q = self._build_QNetwork('Qeval', True) # state Q
self.next_Q = self._build_QNetwork('next_eval',False) # next state Q
# actions selection corresponding one hot matrix column
one_hot = tf.one_hot(self.actions, self.num_actions, 1., 0.)
Qmax = tf.reduce_sum(self.Q * one_hot, axis=1)
self._loss = tf.reduce_mean(tf.squared_difference(Qmax, self.targets))
self._train_op = tf.train.RMSPropOptimizer(learning_rate).minimize(self._loss)
# session
self.sess = tf.Session()
self.sess.run(tf.global_variables_initializer())
def init(self):
self.step_counter = 0
self.cache = deque(maxlen=self.memory_size) # replay experience
def _build_QNetwork(self, name, trainable):
with tf.variable_scope(name):
# input layer
network = tf.layers.conv2d(self.inputs, 16, [8,8], [4,4], 'same',
activation=tf.nn.relu, trainable=trainable, name='input_layer')
# hidden layer
network = tf.layers.conv2d(network, 32, [4,4], [2,2], 'same',
activation=tf.nn.relu, trainable=trainable, name='hidden_layer')
# final layer
network = tf.contrib.layers.flatten(network)
network = tf.layers.dense(network, 64, tf.nn.relu,
trainable=trainable, name='final_layer')
# output layer
network = tf.layers.dense(network, self.num_actions, None,
trainable=trainable, name='output_layer')
return network
def update_nextQ_network(self):
next_params = tf.get_collection(
tf.GraphKeys.GLOBAL_VARIABLES,
scope='next_eval')
Q_params = tf.get_collection(
tf.GraphKeys.GLOBAL_VARIABLES,
scope='Qeval')
# zip 长度不等时,取长度的最小的
self.sess.run([tf.assign(n,q) for n,q in zip(next_params, Q_params)])
def update_cache(self, state, action, reward, next_state, done):
# update replay experience pool
self.cache.append((state, action, reward, next_state, done))
def _get_minibatch(self):
# get samples from replay experience pool
minibatch = random.sample(self.cache, self.batch_size)
state = np.vstack([i[0] for i in minibatch])
action = np.squeeze(np.vstack([i[1] for i in minibatch]))
reward = np.squeeze(np.vstack([i[2] for i in minibatch]))
next_state = np.vstack([i[3] for i in minibatch])
done = [i[4] for i in minibatch]
return state, action, reward, next_state, done
def step_learning(self):
# samples from repaly experience pool
state, action, reward, next_state, done = self._get_minibatch()
next_Q = self.sess.run(self.next_Q, feed_dict={self.inputs:next_state})
# done mask True 1 False 0
mask = np.array(done).astype('float')
target = mask * reward + (1 - mask) * \
(reward + self.gamma * np.max(next_Q, axis=1))
# op gradient descent step
self.sess.run(self._train_op,
feed_dict={self.inputs:state,
self.actions:action,
self.targets:target})
def greedy_policy(self, observation):
# 注:只在优化逼近函数参数过程使用 varepsilon greedy policy
action_value = self.sess.run(
self.Q, feed_dict={self.inputs:observation})
return np.argmax(action_value, axis=1)[0]
def varepsilon_greedy_policy(self, observation, varepsilon=0.9):
if np.random.uniform() < varepsilon:
action = self.greedy_policy(observation)
else:
action = np.random.randint(self.num_actions)
return action
In [ ]: