In [69]:
from __future__ import print_function
from __future__ import division
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
sns.set_context(rc={'figure.figsize': (14, 7) } )
figzize_me = figsize =(14, 7)
import os
import sys
# 使用insert 0即只使用github,避免交叉使用了pip安装的abupy,导致的版本不一致问题
sys.path.insert(0, os.path.abspath('../'))
import abupy
abu量化系统github地址 (您的star是我的动力!)
In [70]:
from abupy import AbuFactorAtrNStop, AbuFactorPreAtrNStop, AbuFactorCloseAtrNStop, AbuFactorBuyBreak
from abupy import abu, EMarketTargetType, AbuMetricsBase, ABuMarketDrawing, ABuProgress, ABuSymbolPd
from abupy import EMarketTargetType, EDataCacheType, EMarketSourceType, EMarketDataFetchMode, EStoreAbu, AbuUmpMainMul
from abupy import AbuUmpMainDeg, AbuUmpMainJump, AbuUmpMainPrice, AbuUmpMainWave, feature, AbuFeatureDegExtend
from abupy import AbuUmpEdgeDeg, AbuUmpEdgePrice, AbuUmpEdgeWave, AbuUmpEdgeFull, AbuUmpEdgeMul, AbuUmpEegeDegExtend
from abupy import AbuUmpMainDegExtend, ump, Parallel, delayed, AbuMulPidProgress, AbuProgress
,
# 关闭沙盒数据
abupy.env.disable_example_env_ipython()
In [13]:
abupy.env.g_market_target = EMarketTargetType.E_MARKET_TARGET_CN
abupy.env.g_data_fetch_mode = EMarketDataFetchMode.E_DATA_FETCH_FORCE_LOCAL
abu_result_tuple_train = abu.load_abu_result_tuple(n_folds=5, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='train_cn')
abu_result_tuple_test = abu.load_abu_result_tuple(n_folds=5, store_type=EStoreAbu.E_STORE_CUSTOM_NAME,
custom_name='test_cn')
ABuProgress.clear_output()
print('训练集结果:')
metrics_train = AbuMetricsBase.show_general(*abu_result_tuple_train, only_show_returns=True)
print('测试集结果:')
metrics_test = AbuMetricsBase.show_general(*abu_result_tuple_test, only_show_returns=True)
In [16]:
orders_pd_train = abu_result_tuple_train.orders_pd
In [ ]:
# 选择失败的前20笔交易绘制交易快照
# 这里只是示例,实战中根据需要挑选,rank或者其他方式
plot_simple = orders_pd_train[orders_pd_train.profit_cg < 0][:20]
# save=True保存在本地, 文件保存在~/abu/data/save_png/中
ABuMarketDrawing.plot_candle_from_order(plot_simple, save=True)
In [37]:
from abupy import AbuUmpMainDeg
# 参数为orders_pd
ump_deg = AbuUmpMainDeg(orders_pd_train)
# df即由之前ump_main_make_xy生成的类df,表11-1所示
ump_deg.fiter.df.head()
Out[37]:
耗时操作,快的电脑大概几分钟,具体根据电脑性能,cpu数量,启动多进程进行训练:
In [19]:
_ = ump_deg.fit(brust_min=False)
In [20]:
ump_deg.cprs
Out[20]:
In [21]:
max_failed_cluster = ump_deg.cprs.loc[ump_deg.cprs.lrs.argmax()]
print('失败概率最大的分类簇{0}, 失败率为{1:.2f}%, 簇交易总数{2}, ' \
'簇平均交易获利{3:.2f}%'.format(ump_deg.cprs.lrs.argmax(),
max_failed_cluster.lrs * 100,
max_failed_cluster.lcs,
max_failed_cluster.lms * 100))
In [22]:
cpt = int(ump_deg.cprs.lrs.argmax().split('_')[0])
print(cpt)
ump_deg.show_parse_rt(ump_deg.rts[cpt])
In [23]:
max_failed_cluster_orders = ump_deg.nts[ump_deg.cprs.lrs.argmax()]
# 表11-3所示
max_failed_cluster_orders
Out[23]:
由于不是同一份沙盒数据,所以下面结果内容与书中分析内容不符,需要按照实际情况分析:
比如下面的特征即是42日和60日的deg格外大,21和252相对训练集平均值也很大:
In [25]:
from abupy import ml
ml.show_orders_hist(max_failed_cluster_orders, ['buy_deg_ang21', 'buy_deg_ang42', 'buy_deg_ang60','buy_deg_ang252'])
print('分类簇中deg_ang60平均值为{0:.2f}'.format(
max_failed_cluster_orders.buy_deg_ang60.mean()))
print('分类簇中deg_ang21平均值为{0:.2f}'.format(
max_failed_cluster_orders.buy_deg_ang21.mean()))
print('分类簇中deg_ang42平均值为{0:.2f}'.format(
max_failed_cluster_orders.buy_deg_ang42.mean()))
print('分类簇中deg_ang252平均值为{0:.2f}'.format(
max_failed_cluster_orders.buy_deg_ang252.mean()))
In [26]:
ml.show_orders_hist(orders_pd_train, ['buy_deg_ang21', 'buy_deg_ang42', 'buy_deg_ang60', 'buy_deg_ang252'])
print('训练数据集中deg_ang60平均值为{0:.2f}'.format(
orders_pd_train.buy_deg_ang60.mean()))
print('训练数据集中deg_ang21平均值为{0:.2f}'.format(
orders_pd_train.buy_deg_ang21.mean()))
print('训练数据集中deg_ang42平均值为{0:.2f}'.format(
orders_pd_train.buy_deg_ang42.mean()))
print('训练数据集中deg_ang252平均值为{0:.2f}'.format(
orders_pd_train.buy_deg_ang252.mean()))
In [31]:
progress = AbuProgress(len(max_failed_cluster_orders), 0, label='plot snap')
for ind in np.arange(0, len(max_failed_cluster_orders)):
progress.show(ind)
order_ind = int(max_failed_cluster_orders.iloc[ind].ind)
# 交易快照文件保存在~/abu/data/save_png/中
ABuMarketDrawing.plot_candle_from_order(ump_deg.fiter.order_has_ret.iloc[order_ind], save=True)
交易快照文件保存在~/abu/data/save_png/中, 下面打开对应目录:save_png
In [32]:
if abupy.env.g_is_mac_os:
!open $abupy.env.g_project_data_dir
else:
!echo $abupy.env.g_project_data_dir
In [33]:
brust_min = ump_deg.brust_min()
brust_min
Out[33]:
In [34]:
llps = ump_deg.cprs[(ump_deg.cprs['lps'] <= brust_min[0]) & (ump_deg.cprs['lms'] <= brust_min[1] )& (ump_deg.cprs['lrs'] >=brust_min[2])]
llps
Out[34]:
In [35]:
ump_deg.choose_cprs_component(llps)
In [36]:
ump_deg.dump_clf(llps)
In [ ]:
from abupy import AbuUmpMainJump
# 耗时操作,大概需要10几分钟,具体根据电脑性能,cpu情况
ump_jump = AbuUmpMainJump.ump_main_clf_dump(orders_pd_train, save_order=False)
In [39]:
ump_jump.fiter.df.head()
Out[39]:
下面这个的这个拦截特征比较明显,两天前才发生向上跳空的交易:
In [40]:
print('失败概率最大的分类簇{0}'.format(ump_jump.cprs.lrs.argmax()))
# 拿出跳空失败概率最大的分类簇
max_failed_cluster_orders = ump_jump.nts[ump_jump.cprs.lrs.argmax()]
# 显示失败概率最大的分类簇,表11-6所示
max_failed_cluster_orders
Out[40]:
In [49]:
ml.show_orders_hist(max_failed_cluster_orders, feature_columns=['buy_diff_up_days', 'buy_jump_up_power',
'buy_diff_down_days', 'buy_jump_down_power'])
print('分类簇中jump_up_power平均值为{0:.2f}, 向上跳空平均天数{1:.2f}'.format(
max_failed_cluster_orders.buy_jump_up_power.mean(), max_failed_cluster_orders.buy_diff_up_days.mean()))
print('分类簇中jump_down_power平均值为{0:.2f}, 向下跳空平均天数{1:.2f}'.format(
max_failed_cluster_orders.buy_jump_down_power.mean(), max_failed_cluster_orders.buy_diff_down_days.mean()))
print('训练数据集中jump_up_power平均值为{0:.2f},向上跳空平均天数{1:.2f}'.format(
orders_pd_train.buy_jump_up_power.mean(), orders_pd_train.buy_diff_up_days.mean()))
print('训练数据集中jump_down_power平均值为{0:.2f}, 向下跳空平均天数{1:.2f}'.format(
orders_pd_train.buy_jump_down_power.mean(), orders_pd_train.buy_diff_down_days.mean()))
In [50]:
from abupy import AbuUmpMainPrice
ump_price = AbuUmpMainPrice.ump_main_clf_dump(orders_pd_train, save_order=False)
In [51]:
ump_price.fiter.df.head()
Out[51]:
In [52]:
print('失败概率最大的分类簇{0}'.format(ump_price.cprs.lrs.argmax()))
# 拿出价格失败概率最大的分类簇
max_failed_cluster_orders = ump_price.nts[ump_price.cprs.lrs.argmax()]
# 表11-8所示
max_failed_cluster_orders
Out[52]:
In [53]:
from abupy import AbuUmpMainWave
ump_wave = AbuUmpMainWave.ump_main_clf_dump(orders_pd_train, save_order=False)
In [54]:
ump_wave.fiter.df.head()
Out[54]:
In [55]:
print('失败概率最大的分类簇{0}'.format(ump_wave.cprs.lrs.argmax()))
# 拿出波动特征失败概率最大的分类簇
max_failed_cluster_orders = ump_wave.nts[ump_wave.cprs.lrs.argmax()]
# 表11-10所示
max_failed_cluster_orders
Out[55]:
In [60]:
ml.show_orders_hist(max_failed_cluster_orders, feature_columns=['buy_wave_score1', 'buy_wave_score3'])
print('分类簇中wave_score1平均值为{0:.2f}'.format(
max_failed_cluster_orders.buy_wave_score1.mean()))
print('分类簇中wave_score3平均值为{0:.2f}'.format(
max_failed_cluster_orders.buy_wave_score3.mean()))
ml.show_orders_hist(orders_pd_train, feature_columns=['buy_wave_score1', 'buy_wave_score1'])
print('训练数据集中wave_score1平均值为{0:.2f}'.format(
orders_pd_train.buy_wave_score1.mean()))
print('训练数据集中wave_score3平均值为{0:.2f}'.format(
orders_pd_train.buy_wave_score1.mean()))
In [61]:
# 选取有交易结果的数据order_has_result
order_has_result = abu_result_tuple_test.orders_pd[abu_result_tuple_test.orders_pd.result != 0]
In [62]:
ump_wave.best_hit_cnt_info(ump_wave.llps)
In [63]:
from abupy import AbuUmpMainDeg, AbuUmpMainJump, AbuUmpMainPrice, AbuUmpMainWave
ump_deg = AbuUmpMainDeg(predict=True)
ump_jump = AbuUmpMainJump(predict=True)
ump_price = AbuUmpMainPrice(predict=True)
ump_wave = AbuUmpMainWave(predict=True)
In [65]:
def apply_ml_features_ump(order, predicter, progress, need_hit_cnt):
if not isinstance(order.ml_features, dict):
import ast
# 低版本pandas dict对象取出来会成为str
ml_features = ast.literal_eval(order.ml_features)
else:
ml_features = order.ml_features
progress.show()
# 将交易单中的买入时刻特征传递给ump主裁决策器,让每一个主裁来决策是否进行拦截
return predicter.predict_kwargs(need_hit_cnt=need_hit_cnt, **ml_features)
def pararllel_func(ump, ump_name):
with AbuMulPidProgress(len(order_has_result), '{} complete'.format(ump_name)) as progress:
# 启动多进程进度条,对order_has_result进行apply
ump_result = order_has_result.apply(apply_ml_features_ump, axis=1, args=(ump, progress, 2,))
return ump_name, ump_result
# 并行处理4个主裁,每一个主裁启动一个进程进行拦截决策
parallel = Parallel(
n_jobs=4, verbose=0, pre_dispatch='2*n_jobs')
out = parallel(delayed(pararllel_func)(ump, ump_name)
for ump, ump_name in zip([ump_deg, ump_jump, ump_price, ump_wave],
['ump_deg', 'ump_jump', 'ump_price', 'ump_wave']))
# 将每一个进程中的裁判的拦截决策进行汇总
for sub_out in out:
order_has_result[sub_out[0]] = sub_out[1]
In [66]:
block_pd = order_has_result.filter(regex='^ump_*')
# 把所有主裁的决策进行相加
block_pd['sum_bk'] = block_pd.sum(axis=1)
block_pd['result'] = order_has_result['result']
# 有投票1的即会进行拦截
block_pd = block_pd[block_pd.sum_bk > 0]
print('四个裁判整体拦截正确率{:.2f}%'.format(block_pd[block_pd.result == -1].result.count() / block_pd.result.count() * 100))
block_pd.tail()
Out[66]:
In [68]:
print('角度裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_deg')))
print('角度扩展裁判拦拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_jump')))
print('单混裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_wave')))
print('价格裁判拦截正确率{:.2f}%, 拦截交易数量{}'.format(*sub_ump_show('ump_price')))
In [ ]: