作者: 阿布
阿布量化版权所有 未经允许 禁止转载
abu量化系统github地址 (欢迎+star)
上一节通过切割A股市场训练集测试集symbol,分别对切割的训练集和测试集做了回测,本节将示例A股ump主裁,边裁决策。
首先导入abupy中本节使用的模块:
In [1]:
# 基础库导入
from __future__ import print_function
from __future__ import division
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ipywidgets
%matplotlib inline
import os
import sys
# 使用insert 0即只使用github,避免交叉使用了pip安装的abupy,导致的版本不一致问题
sys.path.insert(0, os.path.abspath('../'))
import abupy
# 使用沙盒数据,目的是和书中一样的数据环境
abupy.env.enable_example_env_ipython()
In [2]:
from abupy import AbuFactorAtrNStop, AbuFactorPreAtrNStop, AbuFactorCloseAtrNStop, AbuFactorBuyBreak
from abupy import abu, EMarketTargetType, AbuMetricsBase, ABuMarketDrawing, ABuProgress, ABuSymbolPd
from abupy import EMarketTargetType, EDataCacheType, EMarketSourceType, EMarketDataFetchMode, EStoreAbu, AbuUmpMainMul
from abupy import AbuUmpMainDeg, AbuUmpMainJump, AbuUmpMainPrice, AbuUmpMainWave, feature, AbuFeatureDegExtend
from abupy import AbuUmpEdgeDeg, AbuUmpEdgePrice, AbuUmpEdgeWave, AbuUmpEdgeFull, AbuUmpEdgeMul, AbuUmpEegeDegExtend
from abupy import AbuUmpMainDegExtend, ump, Parallel, delayed, AbuMulPidProgress
# 关闭沙盒数据
abupy.env.disable_example_env_ipython()
下面读取上一节存储的训练集和测试集回测数据,如下所示:
In [3]:
abupy.env.g_market_target = EMarketTargetType.E_MARKET_TARGET_CN
abupy.env.g_data_fetch_mode = EMarketDataFetchMode.E_DATA_FETCH_FORCE_LOCAL
abu_result_tuple = abu.load_abu_result_tuple(n_folds=5, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='train_cn')
abu_result_tuple_test = abu.load_abu_result_tuple(n_folds=5, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='test_cn')
ABuProgress.clear_output()
print('训练集结果:')
metrics_train = AbuMetricsBase.show_general(*abu_result_tuple, returns_cmp=True ,only_info=True)
print('测试集结果:')
metrics_test = AbuMetricsBase.show_general(*abu_result_tuple_test, returns_cmp=True, only_info=True)
In [4]:
# 需要全局设置为A股市场,在ump会根据市场类型保存读取对应的ump
abupy.env.g_market_target = EMarketTargetType.E_MARKET_TARGET_CN
ump_deg=None
ump_mul=None
ump_price=None
ump_main_deg_extend=None
# 使用训练集交易数据训练主裁
orders_pd_train_cn = abu_result_tuple.orders_pd
def train_main_ump():
print('AbuUmpMainDeg begin...')
AbuUmpMainDeg.ump_main_clf_dump(orders_pd_train_cn, save_order=False, show_order=False)
print('AbuUmpMainPrice begin...')
AbuUmpMainPrice.ump_main_clf_dump(orders_pd_train_cn, save_order=False, show_order=False)
print('AbuUmpMainMul begin...')
AbuUmpMainMul.ump_main_clf_dump(orders_pd_train_cn, save_order=False, show_order=False)
print('AbuUmpMainDegExtend begin...')
AbuUmpMainDegExtend.ump_main_clf_dump(orders_pd_train_cn, save_order=False, show_order=False)
# 依然使用load_main_ump,避免下面多进程内存拷贝过大
load_main_ump()
def load_main_ump():
global ump_deg, ump_mul, ump_price, ump_main_deg_extend
ump_deg = AbuUmpMainDeg(predict=True)
ump_mul = AbuUmpMainMul(predict=True)
ump_price = AbuUmpMainPrice(predict=True)
ump_main_deg_extend = AbuUmpMainDegExtend(predict=True)
print('load main ump complete!')
def select(select):
if select == 'train main ump':
train_main_ump()
else:
load_main_ump()
_ = ipywidgets.interact_manual(select, select=['train main ump', 'load main ump'])
下面首先通过从测试集交易中筛选出来已经有交易结果的交易,如下:
In [5]:
# 选取有交易结果的数据order_has_result
order_has_result = abu_result_tuple_test.orders_pd[abu_result_tuple_test.orders_pd.result != 0]
order_has_result的交易单中记录了所买入时刻的交易特征,如下所示:
In [6]:
order_has_result.filter(regex='^buy(_deg_|_price_|_wave_|_jump)').head()
Out[6]:
可以通过一个一个迭代交易单,将交易单中的买入时刻特征传递给ump主裁决策器,让每一个主裁来决策是否进行拦截,这样可以统计每一个主裁的拦截成功率,以及整体拦截率等,如下所示:
备注:
In [18]:
def apply_ml_features_ump(order, predicter, progress, need_hit_cnt):
if not isinstance(order.ml_features, dict):
import ast
# 低版本pandas dict对象取出来会成为str
ml_features = ast.literal_eval(order.ml_features)
else:
ml_features = order.ml_features
progress.show()
# 将交易单中的买入时刻特征传递给ump主裁决策器,让每一个主裁来决策是否进行拦截
return predicter.predict_kwargs(need_hit_cnt=need_hit_cnt, **ml_features)
def pararllel_func(ump_object, ump_name):
with AbuMulPidProgress(len(order_has_result), '{} complete'.format(ump_name)) as progress:
# 启动多进程进度条,对order_has_result进行apply
ump_result = order_has_result.apply(apply_ml_features_ump, axis=1, args=(ump_object, progress, 2,))
return ump_name, ump_result
if sys.version_info > (3, 4, 0):
# python3.4以上并行处理4个主裁,每一个主裁启动一个进程进行拦截决策
parallel = Parallel(
n_jobs=4, verbose=0, pre_dispatch='2*n_jobs')
out = parallel(delayed(pararllel_func)(ump_object, ump_name)
for ump_object, ump_name in zip([ump_deg, ump_mul, ump_price, ump_main_deg_extend],
['ump_deg', 'ump_mul', 'ump_price', 'ump_main_deg_extend']))
else:
# 3.4下由于子进程中pickle ump的内部类会找不到,所以暂时只使用一个进程一个一个的处理
out = [pararllel_func(ump_object, ump_name) for ump_object, ump_name in zip([ump_deg, ump_mul, ump_price, ump_main_deg_extend],
['ump_deg', 'ump_mul', 'ump_price', 'ump_main_deg_extend'])]
# 将每一个进程中的裁判的拦截决策进行汇总
for sub_out in out:
order_has_result[sub_out[0]] = sub_out[1]
通过把所有主裁的决策进行相加, 如果有投票1的即会进行拦截,四个裁判整体拦截正确率统计:
In [19]:
block_pd = order_has_result.filter(regex='^ump_*')
# 把所有主裁的决策进行相加
block_pd['sum_bk'] = block_pd.sum(axis=1)
block_pd['result'] = order_has_result['result']
# 有投票1的即会进行拦截
block_pd = block_pd[block_pd.sum_bk > 0]
print('四个裁判整体拦截正确率{:.2f}%'.format(block_pd[block_pd.result == -1].result.count() / block_pd.result.count() * 100))
block_pd.tail()
Out[19]:
下面统计每一个主裁的拦截正确率:
In [20]:
from sklearn import metrics
def sub_ump_show(block_name):
sub_block_pd = block_pd[(block_pd[block_name] == 1)]
# 如果失败就正确 -1->1 1->0
sub_block_pd.result = np.where(sub_block_pd.result == -1, 1, 0)
return metrics.accuracy_score(sub_block_pd[block_name], sub_block_pd.result) * 100, sub_block_pd.result.count()
print('角度裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_deg')))
print('角度扩展裁判拦拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_main_deg_extend')))
print('单混裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_mul')))
print('价格裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_price')))
In [15]:
# 需要全局设置为A股市场,在ump会根据市场类型保存读取对应的ump
abupy.env.g_market_target = EMarketTargetType.E_MARKET_TARGET_CN
print('AbuUmpEdgeDeg begin...')
AbuUmpEdgeDeg.ump_edge_clf_dump(orders_pd_train_cn)
edge_deg = AbuUmpEdgeDeg(predict=True)
print('AbuUmpEdgePrice begin...')
AbuUmpEdgePrice.ump_edge_clf_dump(orders_pd_train_cn)
edge_price = AbuUmpEdgePrice(predict=True)
print('AbuUmpEdgeMul begin...')
AbuUmpEdgeMul.ump_edge_clf_dump(orders_pd_train_cn)
edge_mul = AbuUmpEdgeMul(predict=True)
print('AbuUmpEegeDegExtend begin...')
AbuUmpEegeDegExtend.ump_edge_clf_dump(orders_pd_train_cn)
edge_deg_extend = AbuUmpEegeDegExtend(predict=True)
print('fit edge complete!')
使用与主裁类似的方式,一个一个迭代交易单,将交易单中的买入时刻特征传递给ump边裁决策器,让每一个边裁来决策是否进行拦截,统计每一个边裁的拦截成功率,以及整体拦截率等,如下所示:
备注:如下的代码使用abupy中再次封装joblib的多进程调度使用示例,以及abupy中封装的多进程进度条的使用示例
In [17]:
def apply_ml_features_edge(order, predicter, progress):
if not isinstance(order.ml_features, dict):
import ast
# 低版本pandas dict对象取出来会成为str
ml_features = ast.literal_eval(order.ml_features)
else:
ml_features = order.ml_features
# 边裁进行裁决
progress.show()
# 将交易单中的买入时刻特征传递给ump边裁决策器,让每一个边裁来决策是否进行拦截
edge = predicter.predict(**ml_features)
return edge.value
def edge_pararllel_func(edge, edge_name):
with AbuMulPidProgress(len(order_has_result), '{} complete'.format(edge_name)) as progress:
# # 启动多进程进度条,对order_has_result进行apply
edge_result = order_has_result.apply(apply_ml_features_edge, axis=1, args=(edge, progress,))
return edge_name, edge_result
if sys.version_info > (3, 4, 0):
# python3.4以上并行处理4个边裁的决策,每一个边裁启动一个进程进行拦截决策
parallel = Parallel(
n_jobs=4, verbose=0, pre_dispatch='2*n_jobs')
out = parallel(delayed(edge_pararllel_func)(edge, edge_name)
for edge, edge_name in zip([edge_deg, edge_price, edge_mul, edge_deg_extend],
['edge_deg', 'edge_price', 'edge_mul', 'edge_deg_extend']))
else:
# 3.4下由于子进程中pickle ump的内部类会找不到,所以暂时只使用一个进程一个一个的处理
out = [edge_pararllel_func(edge, edge_name) for edge, edge_name in zip([edge_deg, edge_price, edge_mul, edge_deg_extend],
['edge_deg', 'edge_price', 'edge_mul', 'edge_deg_extend'])]
# 将每一个进程中的裁判的拦截决策进行汇总
for sub_out in out:
order_has_result[sub_out[0]] = sub_out[1]
通过把所有边裁的决策进行统计, 如果有投票-1的结果即判定loss_top的拿出来和真实交易结果result组成结果集,统计四个边裁的整体拦截正确率以及拦截率,如下所示:
In [18]:
block_pd = order_has_result.filter(regex='^edge_*')
"""
由于predict返回的结果中1代表win top
但是我们只需要知道loss_top,所以只保留-1, 其他1转换为0。
"""
block_pd['edge_block'] = \
np.where(np.min(block_pd, axis=1) == -1, -1, 0)
# 拿出真实的交易结果
block_pd['result'] = order_has_result['result']
# 拿出-1的结果,即判定loss_top的
block_pd = block_pd[block_pd.edge_block == -1]
print('四个裁判整体拦截正确率{:.2f}%'.format(block_pd[block_pd.result == -1].result.count() /
block_pd.result.count() * 100))
print('四个边裁拦截交易总数{}, 拦截率{:.2f}%'.format(
block_pd.shape[0],
block_pd.shape[0] / order_has_result.shape[0] * 100))
block_pd.head()
Out[18]:
下面再统计每一个 边裁的拦截正确率:
In [19]:
from sklearn import metrics
def sub_edge_show(edge_name):
sub_edge_block_pd = order_has_result[(order_has_result[edge_name] == -1)]
return metrics.accuracy_score(sub_edge_block_pd[edge_name], sub_edge_block_pd.result) * 100, sub_edge_block_pd.shape[0]
print('角度边裁拦截正确率{0:.2f}%, 拦截交易数量{1:}'.format(*sub_edge_show('edge_deg')))
print('单混边裁拦截正确率{0:.2f}%, 拦截交易数量{1:}'.format(*sub_edge_show('edge_mul')))
print('价格边裁拦截正确率{0:.2f}%, 拦截交易数量{1:}'.format(*sub_edge_show('edge_price')))
print('角度扩展边裁拦截正确率{0:.2f}%, 拦截交易数量{1:}'.format(*sub_edge_show('edge_deg_extend')))
In [22]:
# 开启内置主裁
abupy.env.g_enable_ump_main_deg_block = True
abupy.env.g_enable_ump_main_price_block = True
# 开启内置边裁
abupy.env.g_enable_ump_edge_deg_block = True
abupy.env.g_enable_ump_edge_price_block = True
# 回测时需要开启特征生成,因为裁判开启需要生成特征做为输入
abupy.env.g_enable_ml_feature = True
# 回测时使用上一次切割好的测试集数据
abupy.env.g_enable_last_split_test = True
abupy.beta.atr.g_atr_pos_base = 0.05
用户自定义裁判的开启在‘第18节 自定义裁判决策交易‘ 也示例过,通过ump.manager.append_user_ump即可
注意下面还需要把10,30,50,90,120日走势拟合角度特征的AbuFeatureDegExtend,做为回测时的新的视角来录制比赛(记录回测特征),因为裁判里面有AbuUmpEegeDegExtend和AbuUmpMainDegExtend,它们需要生成带有10,30,50,90,120日走势拟合角度特征的回测交易单
代码如下所示:
In [20]:
feature.clear_user_feature()
# 10,30,50,90,120日走势拟合角度特征的AbuFeatureDegExtend,做为回测时的新的视角来录制比赛
feature.append_user_feature(AbuFeatureDegExtend)
# 打开使用用户自定义裁判开关
ump.manager.g_enable_user_ump = True
# 先clear一下
ump.manager.clear_user_ump()
# 把新的裁判AbuUmpMainDegExtend类名称使用append_user_ump添加到系统中
ump.manager.append_user_ump(AbuUmpEegeDegExtend)
# 把新的裁判AbuUmpMainDegExtend类名称使用append_user_ump添加到系统中
ump.manager.append_user_ump(AbuUmpMainDegExtend)
买入因子,卖出因子等依然使用相同的设置,如下所示:
In [25]:
# 初始化资金500万
read_cash = 5000000
# 买入因子依然延用向上突破因子
buy_factors = [{'xd': 60, 'class': AbuFactorBuyBreak},
{'xd': 42, 'class': AbuFactorBuyBreak}]
# 卖出因子继续使用上一节使用的因子
sell_factors = [
{'stop_loss_n': 1.0, 'stop_win_n': 3.0,
'class': AbuFactorAtrNStop},
{'class': AbuFactorPreAtrNStop, 'pre_atr_n': 1.5},
{'class': AbuFactorCloseAtrNStop, 'close_atr_n': 1.5}
]
abupy.env.g_market_target = EMarketTargetType.E_MARKET_TARGET_CN
abupy.env.g_data_fetch_mode = EMarketDataFetchMode.E_DATA_FETCH_FORCE_LOCAL
完成裁判组合的开启,即可开始回测,回测操作流程和之前的操作一样:
下面开始回测,第一次运行select:run loop back ump,然后点击run select_ump,如果已经回测过可select:load test ump data直接从缓存数据读取:
In [21]:
abupy.env.g_market_target = EMarketTargetType.E_MARKET_TARGET_CN
abupy.env.g_data_fetch_mode = EMarketDataFetchMode.E_DATA_FETCH_FORCE_LOCAL
abu_result_tuple_test_ump = None
def run_loop_back_ump():
global abu_result_tuple_test_ump
abu_result_tuple_test_ump, _ = abu.run_loop_back(read_cash,
buy_factors,
sell_factors,
choice_symbols=None,
start='2012-08-08', end='2017-08-08')
# 把运行的结果保存在本地,以便之后分析回测使用,保存回测结果数据代码如下所示
abu.store_abu_result_tuple(abu_result_tuple_test_ump, n_folds=5, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='test_ump_cn')
ABuProgress.clear_output()
def run_load_ump():
global abu_result_tuple_test_ump
abu_result_tuple_test_ump = abu.load_abu_result_tuple(n_folds=5, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='test_ump_cn')
def select_ump(select):
if select == 'run loop back ump':
run_loop_back_ump()
else:
run_load_ump()
_ = ipywidgets.interact_manual(select_ump, select=['run loop back ump', 'load test ump data'])
下面对比针对A股市场测试集交易开启主裁,边裁拦截和未开启主裁,边裁,结果可以看出拦截了接近一半的交易,胜率以及盈亏比都有大幅度提高:
In [23]:
AbuMetricsBase.show_general(*abu_result_tuple_test_ump, returns_cmp=True, only_info=True)
Out[23]:
In [24]:
AbuMetricsBase.show_general(*abu_result_tuple_test, returns_cmp=True, only_info=True)
Out[24]:
abu量化系统文档教程持续更新中,请关注公众号中的更新提醒。
更多关于量化交易相关请阅读《量化交易之路》
更多关于量化交易与机器学习相关请阅读《机器学习之路》
更多关于abu量化系统请关注微信公众号: abu_quant