构建说明


深度强化学习选股

上证50指数增强

简单的强化学习选股框架,在选股方面训练虚拟交易员选股调仓,实现SH50指数增强。

在每个交易日,Agent根据获取的观测数据[Batch, Length, Factor]计算出一个行为向量[Batch],对50只成份股进行调仓,先卖后买使用,使用开盘价成交,在每交易日结束,使用收盘价评估持仓获得reward。

Agent 推断架构为2层LSTM后接MLP输出。

在与训练环境交互的时候使用gather处理n step折现问题。

注:交互环境-账户Account使用向量的方法并行SH50成份股,所以交互环境中股票的顺序是绑定的,并且股票种类使用Batch固定为50。

trainable neural network


In [1]:
import tensorflow as tf
import numpy as np
from sonnet.python.modules.base import AbstractModule
from sonnet.python.modules.basic import Linear as sntLinear
from sonnet.python.modules.gated_rnn import LSTM as sntLSTM
from sonnet.python.modules.basic_rnn import DeepRNN as sntDeepRNN
from sonnet.python.modules.basic import BatchApply as sntBatchApply


def swich(inputs):
    return inputs * tf.nn.sigmoid(inputs)


def Linear(name, output_size):
    initializers = {"w": tf.truncated_normal_initializer(stddev=0.1),
                    "b": tf.constant_initializer(value=0.1)}
    regularizers = {"w": tf.contrib.layers.l2_regularizer(scale=0.1),
                    "b": tf.contrib.layers.l2_regularizer(scale=0.1)}
    return sntLinear(output_size,
                      initializers=initializers,
                      regularizers=regularizers,
                      name=name)

def build_common_network(inputs):
    """common network
    :param inputs: [Time, Batch, state_size]
    :return: [Time, Batch, hidden_size]
    """
    # build rnn
    batch_size = inputs.get_shape().as_list()[1]
    l1 = sntLSTM(64, name='rnn_first')
    l2 = sntLSTM(32, name='rnn_second')
    rnn = sntDeepRNN([l1, l2])
    initial_state = rnn.initial_state(batch_size)
    # looping
    output_sequence, final_state = tf.nn.dynamic_rnn(
        rnn, inputs, initial_state=initial_state, time_major=True)
    return output_sequence


class ActorNet(AbstractModule):
    """actor network
    """
    def __init__(self, name='Actor'):
        super().__init__(name=name)

    def _build(self, output_size, inputs):
        # loop net -> [Time, Batch, hidden_size]
        net = build_common_network(inputs)  # rnn output (-1, 1)
        # linear net
        net = sntBatchApply(Linear('input_layer', 64))(net)
        net = swich(net)
        net = sntBatchApply(Linear('output_layer', output_size))(net)
        return tf.nn.softmax(net)  # [Time, Batch, output_size]

    def get_regularization(self):
        return self.get_variables(tf.GraphKeys.REGULARIZATION_LOSSES)


class CriticNet(AbstractModule):
    """critic network
    """
    def __init__(self, name='critic'):
        super().__init__(name=name)

    def _build(self, inputs):
        # loop net -> [Time, Batch, hidden_size]
        net = build_common_network(inputs)  # range (-1, 1)
        # linear net
        net = sntBatchApply(Linear('input_layer', 64))(net)
        net = swich(net)
        net = sntBatchApply(Linear('output_layer', 1))(net)
        net = tf.squeeze(net, axis=2)
        # net = tf.nn.tanh(net)
        return tf.reduce_mean(net, axis=1)  # [Time]

    def get_regularization(self):
        return self.get_variables(tf.GraphKeys.REGULARIZATION_LOSSES)

Access


In [2]:
LEARNING_RATE = 1e-3
DECAY_RATE = 0.99


class Access(object):
    def __init__(self, batch_size, state_size, action_size):
        with tf.variable_scope('Access'):
            # placeholder
            self.inputs = tf.placeholder(tf.float32, [None, batch_size, state_size], 'inputs')
            # network interface
            self.actor = ActorNet('actor')
            self.critic = CriticNet('critic')

            self.policy = tf.nn.softmax(self.actor(action_size, self.inputs))
            self.value = self.critic(self.inputs)

        # global optimizer
        self.optimizer_actor = tf.train.RMSPropOptimizer(
            LEARNING_RATE, DECAY_RATE, name='optimizer_actor')
        self.optimizer_critic = tf.train.RMSPropOptimizer(
            LEARNING_RATE, DECAY_RATE, name='optimizer_critic')

        # saver
        var_list = self.get_trainable()
        var_list = list(var_list[0] + var_list[1])
        self.saver = tf.train.Saver(var_list=var_list)

    def get_trainable(self):
        return [self.actor.get_variables(), self.critic.get_variables()]

    def save(self, sess, path):
        self.saver.save(sess, path)

    def restore(self, sess, path):
        var_list = list(self.get_trainable()[0] + self.get_trainable()[1])
        saver = tf.train.Saver(var_list=var_list)
        saver.restore(sess, path)

Agent


In [3]:
CLIP_MIN = 0.01
CLIP_MAX = 0.98
ENTROPY_BETA = 0.01
MAX_GRAD_NORM = 50


def batch_choice(a, p):
    action_list = [np.random.choice(a, p=i) for i in p]
    return np.array(action_list)


# local network for advantage actor-critic which are also know as A2C
class Agent(object):
    def __init__(self, name, access, batch_size, state_size, action_size):
        self.Access = access
        self.action_size = action_size
        self.batch_size = batch_size

        with tf.variable_scope(name):
            # placeholder

            # [Time, Batch, N]
            self.inputs = tf.placeholder(
                tf.float32, [None, batch_size, state_size], 'inputs')

            # [T_MAX, Batch]
            self.actions = tf.placeholder(
                tf.int32, [None, batch_size], "actions")

            # [T_MAX]
            self.targets = tf.placeholder(tf.float32, [None], "discounted_rewards")
            self.gathers = tf.placeholder(tf.int32, [None], 'gather_list')

            # build network
            self.actor = ActorNet('actor')
            self.critic = CriticNet('critic')
            policy = self.actor(action_size, self.inputs)  # [Time, Batch, action_size]
            value = self.critic(self.inputs)  # [Time]

            # fix
            policy = tf.clip_by_value(policy, CLIP_MIN, CLIP_MAX, 'constraint')

            # interface
            self.policy = tf.gather(policy, self.gathers)  # [T_MAX, Batch, action_size]
            self.value = tf.gather(value, self.gathers)  # [T_MAX]
            self.policy_step = policy[-1]  # [Batch, action_size]
            self.value_step = value[-1]  # 1

            # build function
            self._build_losses()
            self._build_async()
            self._build_interface()
            print('graph %s' % (str(name)))

    def _build_losses(self):
        # value loss
        self.advantage = self.targets - self.value  # [T_MAX]
        value_loss = 0.5 * tf.square(self.advantage)

        # policy loss
        # [T_MAX, Batch, action_size] -> [T_MAX, Batch]
        policy_action = tf.reduce_sum(
            self.policy * tf.one_hot(self.actions, self.action_size), axis=2)
        # [T_MAX, Batch]
        policy_loss = -tf.log(policy_action) * tf.stop_gradient(
            tf.expand_dims(self.advantage, axis=1))
        # entropy loss [T_MAX, Batch]
        entropy_loss = tf.reduce_sum(self.policy * tf.log(self.policy), axis=2)

        # total loss
        self.critic_loss = tf.reduce_mean(value_loss)
        self.actor_loss = tf.reduce_mean(policy_loss + entropy_loss * ENTROPY_BETA)

        # interface
        self.a_entropy_loss = tf.reduce_mean(entropy_loss)
        self.a_policy_loss = tf.reduce_mean(policy_loss)
        self.a_value_loss = tf.reduce_mean(value_loss)
        self.a_critic_loss = self.critic_loss
        self.a_actor_loss = self.actor_loss
        self.a_advantage = tf.reduce_mean(self.advantage)
        self.a_value_mean = tf.reduce_mean(self.value)
        self.a_policy_mean = tf.reduce_mean(self.policy)

    def _build_async(self):
        global_actor_params, global_critic_params = self.Access.get_trainable()
        local_actor_params, local_critic_params = self.get_trainable()
        actor_grads = tf.gradients(self.actor_loss, list(local_actor_params))
        critic_grads = tf.gradients(self.critic_loss, list(local_critic_params))

        # Set up optimizer with global norm clipping.
        actor_grads, self.a_actor_grad = tf.clip_by_global_norm(actor_grads, MAX_GRAD_NORM)
        critic_grads, self.a_critic_grad = tf.clip_by_global_norm(critic_grads, MAX_GRAD_NORM)

        # update Access
        actor_apply = self.Access.optimizer_actor.apply_gradients(
            zip(list(actor_grads), list(global_actor_params)))
        critic_apply = self.Access.optimizer_critic.apply_gradients(
            zip(list(critic_grads), list(global_critic_params)))
        self.update_global = [actor_apply, critic_apply]

        # update ACNet
        assign_list = []
        for gv, lv in zip(global_actor_params, local_actor_params):
            assign_list.append(tf.assign(lv, gv))
        for gv, lv in zip(global_critic_params, local_critic_params):
            assign_list.append(tf.assign(lv, gv))
        self.update_local = assign_list

    def _build_interface(self):
        self.a_interface = [self.a_actor_loss,
                            self.a_actor_grad,
                            self.a_policy_mean,
                            self.a_policy_loss,
                            self.a_entropy_loss,
                            self.a_critic_loss,
                            self.a_critic_grad,
                            self.a_value_loss,
                            self.a_value_mean,
                            self.a_advantage]

    def get_trainable(self):
        return [self.actor.get_variables(), self.critic.get_variables()]

    def init_or_update_local(self, sess):
        """
        init or update local network
        :param sess:
        :return:
        """
        sess.run(self.update_local)

    def get_step_policy(self, sess, inputs):
        return sess.run(self.policy_step, {self.inputs: inputs})

    def get_step_value(self, sess, inputs):
        return sess.run(self.value_step, {self.inputs: inputs})

    def get_losses(self, sess, inputs, actions, targets, gather_list):
        """
        get all loss functions of network
        :param sess:
        :param inputs:
        :param actions:
        :param targets:
        :return:
        """
        feed_dict = {self.inputs: inputs,
                     self.actions: actions,
                     self.targets: targets,
                     self.gathers: gather_list}
        return sess.run(self.a_interface, feed_dict)

    def train_step(self, sess, inputs, actions, targets, gathers):
        feed_dict = {self.inputs: inputs,
                     self.actions: actions,
                     self.targets: targets,
                     self.gathers: gathers}
        sess.run(self.update_global, feed_dict)

    # get stochastic action for train
    def get_stochastic_action(self, sess, inputs, epsilon=0.9):
        if np.random.uniform() < epsilon:
            policy = sess.run(self.policy_step, {self.inputs: inputs})
            return batch_choice(self.action_size, policy)
        else:
            return np.random.randint(self.action_size, size=self.batch_size)

    # get deterministic action for test
    def get_deterministic_policy_action(self, sess, inputs):
        policy_step = sess.run(self.policy_step, {self.inputs: inputs})
        return np.argmax(policy_step, axis=1)

Framework


In [4]:
from agent.actor_critic import Agent


MAX_EPISODE_LENGTH = 200
GAMMA = 0.9


def batch_stack(inputs):
    # gather index
    gather_list = 63 + 16 * np.arange(len(inputs))
    # stack
    a = [inputs[0][:-16]]
    b = [i[-16:] for i in inputs]
    return np.vstack(a + b), gather_list


class Framework(object):
    def __init__(self, name, access, batch_size, state_size, action_size):
        self.Access = access
        self.AC = Agent(name, self.Access, batch_size, state_size, action_size)
        self.env = Account()
        self.name = name

    def run(self, sess, max_episodes, t_max=8):
        buffer_score = []
        buffer_loss = []
        episode = 0
        while episode < max_episodes:
            episode += 1
            episode_score, outputs = self.run_episode(sess, t_max)
            buffer_score.append(episode_score)
            buffer_loss.append(outputs)
        return buffer_score, buffer_loss

    def run_episode(self, sess, t_max=8):
        t_start = t = 0
        episode_score = 1
        buffer_state = []
        buffer_action = []
        buffer_reward = []

        self.AC.init_or_update_local(sess)
        state = self.env.reset()
        while True:
            t += 1
            action = self.AC.get_stochastic_action(sess, state)
            next_state, reward, done = self.env.step(action)

            # buffer for loop
            episode_score *= (1 + reward / 100)
            buffer_state.append(state)
            buffer_action.append(action)
            buffer_reward.append(reward)
            state = next_state

            if t - t_start == t_max or done:
                t_start = t
                terminal = self.get_bootstrap(sess, next_state, done)

                buffer_target = []
                for r in buffer_reward[::-1]:
                    terminal = r + GAMMA * terminal
                    buffer_target.append(terminal)
                buffer_target.reverse()

                # stack
                inputs, gather_list = batch_stack(buffer_state)
                actions = np.vstack(buffer_action)
                targets = np.squeeze(np.vstack(buffer_target), axis=1)

                # empty buffer
                buffer_state = []
                buffer_action = []
                buffer_reward = []

                # update Access gradients
                self.AC.train_step(sess, inputs, actions, targets, gather_list)

                # update local network
                self.AC.init_or_update_local(sess)

            if done or t > MAX_EPISODE_LENGTH:
                outputs = self.get_losses(sess, inputs, actions, targets, gather_list)
                outputs = tuple(outputs)
                if self.name == 'W0':
                    print('actor: %f, actor_grad: %f, policy mean: %f, policy: %f, entropy: %f, '
                          'critic: %f, critic_grad: %f, value: %f, value_mean: %f, advantage: %f'
                          % outputs)
                return episode_score, outputs

    def get_bootstrap(self, sess, next_state, done):
        if done:
            terminal = 0
        else:
            terminal = self.AC.get_step_value(sess, next_state)
        return terminal

    def get_losses(self, sess, inputs, actions, targets, gather_list):
        return self.AC.get_losses(sess, inputs, actions, targets, gather_list)

main


In [5]:
import multiprocessing
import threading
from env.env_main import Account

NUMS_CPU = multiprocessing.cpu_count()
state_size = 58
batch_size = 50
action_size = 3
max_episodes = 1

In [6]:
GD = {}
class Worker(Framework):

    def __init__(self, name, access, batch_size, state_size, action_size):
        super().__init__(name, access, batch_size, state_size, action_size)

    def run(self, sess, max_episodes, t_max=8):
        episode_score_list = []
        episode = 0
        while episode < max_episodes:
            episode += 1
            episode_socre, _ = self.run_episode(sess, t_max)
            episode_score_list.append(episode_socre)
            GD[str(self.name)] = episode_score_list
            if self.name == 'W0':
                print('Episode: %f, score: %f' % (episode, episode_socre))
                print('\n')

In [7]:
with tf.Session() as sess:
    with tf.device("/cpu:0"):
        A = Access(batch_size, state_size, action_size)
        F_list = []
        for i in range(NUMS_CPU):
            F_list.append(Worker('W%i' % i, A, batch_size, state_size, action_size))
        COORD = tf.train.Coordinator()
        sess.run(tf.global_variables_initializer())
        sess.graph.finalize()

        threads_list = []
        for ac in F_list:
            job = lambda: ac.run(sess, max_episodes)
            t = threading.Thread(target=job)
            t.start()
            threads_list.append(t)
        COORD.join(threads_list)
        A.save(sess, 'model/saver_1.ckpt')


WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
E:\software\Anaconda363\lib\site-packages\tensorflow\python\ops\gradients_impl.py:96: UserWarning: Converting sparse IndexedSlices to a dense Tensor of unknown shape. This may consume a large amount of memory.
  "Converting sparse IndexedSlices to a dense Tensor of unknown shape. "
graph W0
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
graph W1
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
graph W2
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
graph W3
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
graph W4
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
graph W5
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
graph W6
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
graph W7
actor: -0.676055, actor_grad: 0.073945, policy mean: 0.333333, policy: -0.665087, entropy: -1.096820, critic: 0.243507, critic_grad: 0.849599, value: 0.243507, value_mean: 0.189615, advantage: -0.606338
Episode: 1.000000, score: 0.998724


test


In [12]:
tf.reset_default_graph()
import pandas as pd
import seaborn as sns
sns.set_style('whitegrid')
%matplotlib inline

In [13]:
state_size = 58
batch_size = 50
action_size = 3


config = tf.ConfigProto()
config.gpu_options.allow_growth = True
with tf.Session(config=config) as sess:
    with tf.device("/cpu:0"):
        A = Access(batch_size, state_size, action_size)
        W = Agent('W0', A, batch_size, state_size, action_size)
        A.restore(sess,'model/saver_1.ckpt')
        W.init_or_update_local(sess)
        env = Account()
        state = env.reset()
        for _ in range(200):
            action = W.get_deterministic_policy_action(sess, state)
            state, reward, done = env.step(action)


WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
WARNING:tensorflow:The `skip_connections` argument will be deprecated. Please use snt.SkipConnectionCore instead.
E:\software\Anaconda363\lib\site-packages\tensorflow\python\ops\gradients_impl.py:96: UserWarning: Converting sparse IndexedSlices to a dense Tensor of unknown shape. This may consume a large amount of memory.
  "Converting sparse IndexedSlices to a dense Tensor of unknown shape. "
graph W0
INFO:tensorflow:Restoring parameters from model/saver_1.ckpt

In [14]:
value, reward = env.plot_data()

pd.Series(value).plot(figsize=(16,6))


Out[14]:
<matplotlib.axes._subplots.AxesSubplot at 0x20cb120dc88>

In [15]:
pd.Series(reward).plot(figsize=(16,6))
pd.Series(np.zeros_like(reward)).plot(figsize=(16,6), color='r')


Out[15]:
<matplotlib.axes._subplots.AxesSubplot at 0x20cbf1ef128>

In [ ]: