In [450]:
#!/Tsan/bin/python
# -*- coding: utf-8 -*-
In [451]:
# Libraries to use
from __future__ import division
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from sklearn.cluster import KMeans
import talib as tb
from scipy import stats
import copy
In [452]:
%matplotlib inline
%load_ext line_profiler
In [453]:
path = 'C:/Users/LZJF_02/Desktop/BacktestRA/'
In [454]:
filenameKKTest = 'KkRatioStrategy20170601-20171201.csv'
filenameKKTrain = 'KkRatioStrategy20160601-20170601.csv'
In [455]:
# multi
filenameMultiCycleTest = 'MultiCycleStrategy20170601-20171201.csv'
filenameMultiCycleTrain = 'MultiCycleStrategy20150601-20170601.csv'
In [456]:
def calPosPeriod(df,barBin = 5):
df.entryDt = df.entryDt.apply(lambda x:datetime.strptime(x,'%Y-%m-%d %H:%M:%S'))
df.exitDt = df.exitDt.apply(lambda x:datetime.strptime(x,'%Y-%m-%d %H:%M:%S'))
df['posPeriod'] = (df.exitDt - df.entryDt)
df['posPeriod'] = df['posPeriod'].apply(lambda x: x.total_seconds() / 60 /barBin)
return df
In [457]:
def varDiff(df1,df2,iterNum = 10000):
""" To test the whether there is a significance difference bewteen two population variance with unequal Sample size.
Parameters:
df1:DataFrame. The outcome of calPosPeriod function.
df2:DataFrame. The outcome of calPosPeriod function.
iterNum: Int. The iteration number.
Return:Series. The Series of pValueList.
"""
shape1 = df1.shape[0]
shape2 = df2.shape[0]
pValueList = []
if shape1 > shape2:
for i in xrange(iterNum):
pValueList.append(stats.ttest_rel(np.random.choice(df1.pnl.values,shape2),
df2.pnl.values).pvalue)
else:
for i in xrange(iterNum):
pValueList.append(stats.ttest_rel(np.random.choice(df2.pnl.values,shape1),
df1.pnl.values).pvalue)
return pd.Series(pValueList)
In [458]:
kkTest = pd.read_csv(path+filenameKKTest,infer_datetime_format=True,parse_dates=[0])
kkTrain = pd.read_csv(path+filenameKKTrain,infer_datetime_format=True,parse_dates=[0])
kkTest = calPosPeriod(kkTest)
kkTrain = calPosPeriod(kkTrain)
In [459]:
MultiCycleTest = pd.read_csv(path+filenameMultiCycleTest,infer_datetime_format=True,parse_dates=[0])
MultiCycleTrain = pd.read_csv(path+filenameMultiCycleTrain,infer_datetime_format=True,parse_dates=[0])
MultiCycleTest = calPosPeriod(MultiCycleTest)
MultiCycleTrain = calPosPeriod(MultiCycleTrain)
In [460]:
P_MultiCycle = varDiff(MultiCycleTest,MultiCycleTrain)
In [461]:
print 'The 5% quantile is'+" "+ str(P_MultiCycle.quantile(0.05)),'\t'
print 'The number of P-Values below 0.05 is' +" "+ str(P_MultiCycle[P_MultiCycle <= 0.05].shape[0] / P_MultiCycle.shape[0])
P_MultiCycle.describe()
Out[461]:
In [462]:
MultiCycleTest[MultiCycleTest.volume==-1].describe()
Out[462]:
In [463]:
MultiCycleTest[MultiCycleTest.volume==1].describe()
Out[463]:
In [464]:
MultiCycleTrain[MultiCycleTrain.volume==-1].describe()
Out[464]:
In [465]:
MultiCycleTest.pnl.std()**2 / MultiCycleTrain.pnl.std()**2
Out[465]:
In [466]:
# The Levene test tests the null hypothesis that all input samples are from populations with equal variances.
# Levene’s test is an alternative to Bartlett’s test bartlett in the case where there are significant deviations from normality.
# P值比0.05大接受null hypothsis,即方差稳定
stats.levene(MultiCycleTest.pnl.values, MultiCycleTrain.pnl.values)
Out[466]:
In [467]:
stats.levene(MultiCycleTest[MultiCycleTest.volume ==-1].pnl.values, MultiCycleTrain[MultiCycleTrain.volume==-1].pnl.values)
Out[467]:
In [468]:
# F检定判断两者样本方差是否一样(Note that the F-test is extremely sensitive to non-normality of X and Y, so you're probably better off doing a more robust
#test such as Levene's test or Bartlett's test unless you're reasonably sure that X and Y are distributed normally.)
stats.f.cdf(MultiCycleTest.pnl.std()**2 / MultiCycleTrain.pnl.std()**2, MultiCycleTest.shape[0]-1,MultiCycleTrain.shape[0]-1)
Out[468]:
In [469]:
# 多头利润统计分析
MultiCycleTestLong = MultiCycleTest[MultiCycleTest.volume ==1]
MultiCycleTestLong.describe()
Out[469]:
In [470]:
MultiCycleTestLong[MultiCycleTestLong.pnl > 0].describe()
Out[470]:
In [471]:
MultiCycleTestLong[MultiCycleTestLong.pnl < 0].describe()
Out[471]:
In [472]:
# 空头利润统计分析
MultiCycleTestShort= MultiCycleTest[MultiCycleTest.volume ==-1]
MultiCycleTestShort.describe()
Out[472]:
In [473]:
MultiCycleTestShort[MultiCycleTestShort.pnl>0].describe()
Out[473]:
In [474]:
MultiCycleTestShort[MultiCycleTestShort.pnl<0].describe()
Out[474]:
In [475]:
pValueList = []
for i in xrange(10000):
pValueList.append(stats.ttest_rel(np.random.choice(kkTrain.pnl.values,min(kkTest.shape[0],kkTrain.shape[0])), kkTest.pnl.values).pvalue)
pValueSeries = pd.Series(pValueList)
pValueSeries.describe()
Out[475]:
In [476]:
PValue = varDiff(kkTrain,kkTest)
In [477]:
PValue.describe()
Out[477]:
In [ ]:
In [478]:
pValueSeries[pValueSeries <= 0.05].shape[0] / pValueSeries.shape[0]
Out[478]:
In [479]:
pValueSeries.quantile(0.05)
Out[479]:
In [480]:
# 根据pValue的均值和中位数以及5%的分位数可以看出 P-Value是大概率远大于0.05的,所以接受原假设,及两组PNL方差相同,收益稳定。
In [ ]:
In [481]:
np.sqrt(2)
Out[481]:
In [482]:
# int
buffSize = 20
buffCount = 0
LLPeriod = 20
LSPeriod = 10
peakRatio = 0.991
# Array
closeArray = np.zeros(buffSize)
HPArray = np.zeros(buffSize)
FiltArray = np.zeros(buffSize)
IPeakArray = np.zeros(buffSize)
RealArray = np.zeros(buffSize)
QuadArray = np.zeros(buffSize)
QPeakArray = np.ones(buffSize)
ImagArray = np.ones(buffSize)
alpha1 = (np.cos(0.707*360*np.pi/(180 * 48)) + np.sin(0.707*360 * np.pi / (180*48)) - 1) / np.cos(0.707*360 * np.pi/ (180 * 48))
a1Long = np.exp(-1.414 * np.pi / LLPeriod)
b1Long= 2 * a1Long *np.cos(1.414 * 180 * np.pi / (180 * LLPeriod) )
c2Long = b1Long
c3Long = - a1Long ** 2
c1Long = 1 - c2Long - c3Long
a1Short = np.exp(1.414 * np.pi / LSPeriod)
b1Short = 2 * a1Short *np.cos(1.414 * 180 * np.pi / (180 * LSPeriod) )
c2Short = b1Short
c3Short = - a1Short ** 2
c1Short = 1 - c2Short - c3Short
# hibert trasnform
closeNew = MultiCycleTrain. exitPrice.values
for close in closeNew:
# 推送最新收盘价
closeArray[0:buffSize -1] = closeArray[1:buffSize ]
closeArray[-1] = close
# 更新HPArray
HP = ((1 - alpha1 / 2) ** 2) * (closeArray[-1] - 2 * closeArray[-2] + closeArray[-3]) + \
2 * (1 - alpha1) * HPArray[-1] - (1 - alpha1) ** 2 * HPArray[-2]
HPArray[0:buffSize -1] = HPArray[1:buffSize ]
HPArray[-1] = HP
# 更新FiltArray
Filt = c1Long * (HPArray[-1] + HPArray[-2]) / 2 + c2Long * FiltArray[-1] + c3Long * FiltArray[-2]
FiltArray[0:buffSize -1] = FiltArray[1:buffSize ]
FiltArray[-1] = Filt
# 更新IPeak
IPeak = peakRatio * IPeakArray[-1]
if np.abs(Filt) > IPeak or IPeak == 0:
IPeak = np.abs(Filt)
IPeakArray[0:buffSize -1] = IPeakArray[1:buffSize ]
IPeakArray[-1] = IPeak
# 更新Real
Real = Filt / IPeak
RealArray[0:buffSize -1] = RealArray[1:buffSize ]
RealArray[-1] = Real
# 更新Quad
Quad = (RealArray[-1] - RealArray[-2])
QuadArray[0:buffSize -1] = QuadArray[1:buffSize]
QuadArray[-1] = Quad
# 更新QPeak
QPeak = peakRatio * QPeakArray[-1]
if np.abs(Quad) > QPeak:
QPeak = np.abs(Quad)
QPeakArray[0:buffSize -1] = QPeakArray[1:buffSize]
QPeakArray[-1] = Quad
# 更新Imag
Imag = c1Short * (QuadArray[-1] + QuadArray[-2]) / 2 + c2Short * ImagArray[-1] + c3Short * ImagArray[-2]
ImagArray[0:buffSize -1] = ImagArray[1:buffSize]
ImagArray[-1] = Quad
buffCount +=1
#print buffCount
if buffCount < buffSize:
continue
print ImagArray
#print HPArray
In [483]:
ImagArray
Out[483]:
In [484]:
ImagArray
Out[484]:
In [485]:
np.roll(ImagArray,2)
Out[485]:
In [486]:
d = np.array(range(10))
d
Out[486]:
In [487]:
c[1:4] = [4,10,3]
d
Out[487]:
In [488]:
def burgerMethod(ts,iterNum = 10):
f_ = ts
b_ = ts
for i in range(0,iterNum):
f = f_[1:]
b = b_[:-1]
k = - 2 * np.dot(b,f) / (np.dot(f,f) + np.dot(b,b))
if i == iterNum -1:
break
f_ = f + k * b
b_ = b + k * f
return f_,b_
In [489]:
burgerMethod(ImagArray,5)
Out[489]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [490]:
x = copy.deepcopy(np.array(range(12)))
ts = copy.copy(ImagArray)
ts
Out[490]:
In [491]:
ImagArray
Out[491]:
In [492]:
cc = np.array(range(10))
cc[1:3] = [5,6]
cc
Out[492]:
In [493]:
ts
Out[493]:
In [531]:
def burgMese(ts,iterNum =5):
f = copy.copy(ts)
b = copy.copy(ts)
N = len(ts) - 1
Ak = np.zeros(iterNum+1)
Ak[0] = 1
Dk = 2 * (f * b).sum() - f[0] ** 2 - b[N] ** 2
for k in range(iterNum):
# 更新mu
mu = -2*(f[k+1:N] * b[:N-k-1]).sum()/ Dk
# 更新Ak
t1 = Ak[:int((k+1)/2) +1] + mu * Ak[k+1:k-int((k+1)/2):-1]
t2 = Ak[k+1:k-int((k+1)/2):-1] + mu * Ak[:int((k+1)/2)+1]
Ak[:int((k+1)/2) +1] = t1
Ak[k+1:k-int((k+1)/2):-1] = t2
# 更新f和b
t3 = f[k+1:N+1] + mu * b[0:N-k]
t4 = mu * f[k+1:N+1] + b[0:N-k]
f[k+1:N+1] = t3
b[0:N-k] = t4
# 更新Dk
Dk = (1 - mu**2) * Dk - f[k+1]**2 - b[N-k-1]**2
return Ak
In [756]:
Out[756]:
In [1004]:
#pd.Series(np.random.choice(MultiCycleTest.entryPrice.values.astype(float),40)).plot(figsize=(16,9))
a = burgMese(np.random.choice(MultiCycleTrain.entryPrice.values.astype(float),40))[1:]
a = burgMese(np.random.rand(35),20)[1:]
sigma = 1
TSpace = 28
I_num = sigma * sigma
Spectrum = np.zeros(TSpace)*np.nan
mArray = np.array(range(1,a.shape[0]+1))
for Period in range(3,TSpace):
CT = (a * np.cos(2 * np.pi * mArray/Period)).sum()
ST = (a * np.sin(2 * np.pi * mArray/Period)).sum()
Spectrum[Period] = 1 / ((1-CT)**2+(ST)**2)
T = np.nanargmax(Spectrum)
print 'Cycle estimated by MESE is %d '%T
In [980]:
np.random.rand(35)
Out[980]:
In [898]:
a * (mArray +1)
Out[898]:
In [574]:
np.random.uniform(2,10,40)
Out[574]:
In [577]:
np.abs(np.random.normal(40))
Out[577]:
In [506]:
MultiCycleTest.entryPrice.values.astype(float)
Out[506]:
In [ ]:
In [ ]: