In [19]:
#!/Tsan/bin/python
# -*- coding: utf-8 -*-
In [20]:
#"""此note book为股票回测框架的示例"""
In [21]:
# Libraries to use
from __future__ import division
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
In [22]:
# Import My own library for factor testing
from SingleFactorTest import factorFilterFunctions as ff
from BackTestingEngine import backTestingEngine as bte
#from config import *
In [23]:
path = ff.data_path
In [24]:
startTime = datetime.strptime('20160504', '%Y%m%d')
endTime = datetime.strptime('20170228', '%Y%m%d')
initstartTime = datetime.strptime('20160304', '%Y%m%d')
initendTime = datetime.strptime('20170328', '%Y%m%d')
In [25]:
filenameAdjOpen = 'OwnfactorAdjustedOpen.h5'
filenameAdjClose = 'OwnfactorAdjustedClose.h5'
filenameVolume = 'LZ_CN_STKA_QUOTE_TVOLUME.h5'
In [26]:
# 初始化
test = bte.StkBacktesting(path)
In [27]:
test.setInitCap(50000000)
test.availableCashNow
Out[27]:
In [28]:
# 添加数据
test.addData('adjOpen',filenameAdjOpen)
test.addData('adjClose',filenameAdjClose)
test.addData('volume',filenameVolume)
In [29]:
# 截取回测区间和初始化区间
test.setBackTestingPeriod(startTime,endTime)
test.setInitialPeriod(initstartTime,initendTime)
In [30]:
# 双均线筛选买卖股
ma5 = test.dataDict['adjClose'].rolling(window=5, min_periods=5).mean()
ma20 = test.dataDict['adjClose'].rolling(window=20, min_periods=20).mean()
buyStk = ((ma5 > ma20) & ((ma5 < ma20).shift(1) )).shift(1).loc[test.backTestingDateList.tolist()]
sellStk = ((ma5 < ma20) & ((ma5 > ma20).shift(1) )).shift(1).loc[test.backTestingDateList]
In [31]:
test.getAllPosition()
Out[31]:
In [32]:
# 按天循环
for date in test.backTestingDateList:
#todayindex = test.backTestingDateList.tolist().index(date)
#yesterday = test.backTestingDateList[todayindex -1]
#print yesterday
# 切片确定当天的买卖股票
sellslice = sellStk.loc[date]
sellList = sellslice.loc[sellslice==True].index.tolist()
#print 'before',len(sellList)
buyslice = buyStk.loc[date]
buyList = buyslice.loc[buyslice==True].index.tolist()
# 买卖单
buyOrders = [test.makeOrder(date,stkID,100,20,1) for stkID in buyList]
# 区分有无持仓时,只有有持仓时才可以卖,并且顺序永远是先卖后买
if test.getCurrentPosition():
print date, 'Current position Num',len(test.getCurrentPosition())
sellList = list(set(sellList) & set(test.getCurrentPosition().keys()))
sellOrders = [test.makeOrder(date,stkID,test.getCurrentPosition()[stkID].volume,20,-1) for stkID in sellList]
totalOrders = sellOrders + buyOrders
#print 'buyNumber',len(buyOrders)
#print 'sellNumber', len(sellList)
else:
#print 'Current position is 0'
totalOrders = buyOrders
#print len(totalOrders)
test.allOrdersDict[date] = totalOrders # 保存order到字典里
test.crossOrder(date) # 撮合
#test.updateHoldingPnl(date)
#print date,'Wrong one', test.getAllPosition()[date].__len__()
#print date,'compare',len(test.getCurrentPosition())
#try:
# print yesterday,'yester',test.getAllPosition()[yesterday].__len__()
#except:
# pass
In [33]:
#for i,j in test._allCurrentPositionDict.iteritems():
# print i,j.values
In [34]:
positiondf = pd.DataFrame.from_dict(test.getAllPosition(), orient='index')
In [35]:
def converToVolume(holdingclass):
try:
volume = holdingclass.volume
except:
volume = np.NaN
return volume
In [37]:
positiondf.applymap(converToVolume).fillna(0).tail(10)
Out[37]:
In [38]:
test.showBackTestingResult()