Async. MLDG with two parallel tasks per worker


In [ ]:
import warnings
warnings.filterwarnings("ignore") # suppress h5py deprecation warning

import os
import backtrader as bt
import numpy as np

from btgym import BTgymEnv, BTgymRandomDataDomain

from btgym.research.gps.strategy import GuidedStrategy_0_0, ExpertObserver

from btgym.research.mldg.aac import AMLDG
from btgym.research.mldg.policy import AacStackedMetaPolicy

from btgym.algorithms.launcher.meta import MetaLauncher

In [ ]:
# Set backtesting engine and parameters:

engine = bt.Cerebro()

engine.addstrategy(
    GuidedStrategy_0_0,
    drawdown_call=10, # max % to loose, in percent of initial cash
    target_call=10,  # max % to win, same
    skip_frame=10,
    gamma=0.99,
    state_ext_scale=np.linspace(4e3, 1e3, num=6),
    reward_scale=7,
    expert_config=  # see btgym.research.gps.oracle.Oracle class for details
        {
            'time_threshold': 5,
            'pips_threshold': 10, 
            'pips_scale': 1e-4,
            'kernel_size': 10,
            'kernel_stddev': 1,
        },
)

# Expert actions observer:
engine.addobserver(ExpertObserver)

# Set leveraged account:
engine.broker.setcash(2000)
engine.broker.setcommission(commission=0.0001, leverage=10.0) # commisssion to imitate spread
engine.addsizer(bt.sizers.SizerFix, stake=5000)  

# Data var. 1: up to seven years of 1 minute bars:
data_m1_7_year = [
    #'../examples/data/DAT_ASCII_EURUSD_M1_2010.csv',
    #'../examples/data/DAT_ASCII_EURUSD_M1_2011.csv',
    #'../examples/data/DAT_ASCII_EURUSD_M1_2012.csv',
    #'../examples/data/DAT_ASCII_EURUSD_M1_2013.csv',
    '../examples/data/DAT_ASCII_EURUSD_M1_2014.csv',
    '../examples/data/DAT_ASCII_EURUSD_M1_2015.csv',
    '../examples/data/DAT_ASCII_EURUSD_M1_2016.csv',
]

# Data var. 2: up to six month of 1 minute bars:
data_m1_6_month = [
    '../examples/data/DAT_ASCII_EURUSD_M1_201701.csv',
    '../examples/data/DAT_ASCII_EURUSD_M1_201702.csv',
    '../examples/data/DAT_ASCII_EURUSD_M1_201703.csv',
    '../examples/data/DAT_ASCII_EURUSD_M1_201704.csv',
    #'../examples/data/DAT_ASCII_EURUSD_M1_201705.csv',
    #'../examples/data/DAT_ASCII_EURUSD_M1_201706.csv',
]

# Uncomment single choice of source file:
dataset = BTgymRandomDataDomain(  
    #filename=data_m1_7_year,
    filename=data_m1_6_month,
    #filename='../examples/data/DAT_ASCII_EURUSD_M1_2015.csv',
    #filename='../examples/data/DAT_ASCII_EURUSD_M1_2016.csv',
    #filename='../examples/data/DAT_ASCII_EURUSD_M1_201703.csv',
    #filename='../examples/data/DAT_ASCII_EURUSD_M1_201704.csv',
    #filename='../examples/data/DAT_ASCII_EURUSD_M1_201703_1_10.csv',  # ten days
    #filename='../examples/data/test_sine_1min_period256_delta0002.csv',  # simple sine 
    #filename='../examples/data/test_bent_sine_1min_period1500_300_delta0002.csv',  # increasing sine freq
    #filename='../examples/data/test_bent_sine_1min_period_300>1500_delta0002.csv',  # decreasing sine freq
    trial_params=dict(
        start_weekdays={0, 1, 2, 3, 4, 5, 6},
        sample_duration={'days': 8, 'hours': 0},
        start_00=False,
        time_gap={'days': 4, 'hours': 0},
        test_period={'days': 2, 'hours': 0},
    ),
    episode_params=dict(
        start_weekdays={0, 1, 2, 3, 4, 5, 6},
        sample_duration={'days': 1, 'hours': 23, 'minutes': 0},
        start_00=False,
        time_gap={'days': 1, 'hours': 0},
    ),
    target_period={'days': 12, 'hours': 0,}
)

env_config = dict(
    class_ref=BTgymEnv, 
    kwargs=dict(
        dataset=dataset,
        engine=engine,
        render_modes=['episode', 'human', 'external', 'internal'],
        render_state_as_image=True,
        render_ylabel='OHL_diff. / Internals',
        render_size_episode=(12,8),
        render_size_human=(9, 4),
        render_size_state=(11, 3),
        render_dpi=75,
        port=5000,
        data_port=4999,
        connect_timeout=90,
        verbose=0,
    )
)

cluster_config = dict(
    host='127.0.0.1',
    port=12230,
    num_workers=4,  # Set according CPU's available or so
    num_ps=1,
    num_envs=1,
    log_dir=os.path.expanduser('~/tmp/mldg_3'),
)

policy_config = dict(
    class_ref=AacStackedMetaPolicy, 
    kwargs={
        'lstm_layers': (256, 256),
        'conv_2d_layer_config': (
             (32, (3, 1), (2, 1)),
             (32, (3, 1), (2, 1)),
             (64, (3, 1), (2, 1)),
             (64, (3, 1), (2, 1))
         ),
        'encode_internal_state': False,
    }
)

trainer_config = dict(
    class_ref=AMLDG,
    kwargs=dict(
        opt_learn_rate=1e-4, # scalar or random log-uniform 
        opt_end_learn_rate=1e-5,
        opt_decay_steps=20*10**6,
        model_gamma=0.99,
        model_gae_lambda=1.0,
        model_beta=0.01, # entropy reg, scalar or random log-uniform
        guided_lambda=1.0,  # imitation loss weight
        guided_decay_steps=10*10**6,  # annealing guided_lambda to zero
        fast_opt_learn_rate=0.1,
        rollout_length=20,
        time_flat=True,
        trial_source_target_cycle=(5, 1),
        model_summary_freq=20,
        episode_summary_freq=2,
        env_render_freq=5,
    )
)

In [ ]:
launcher = MetaLauncher(
    cluster_config=cluster_config,
    env_config=env_config,
    trainer_config=trainer_config,
    policy_config=policy_config,
    max_env_steps=100*10**6,
    root_random_seed=0,
    purge_previous=1,  # ask to override previously saved model and logs
    verbose=0,
)

launcher.run()

In [ ]: