In [1]:
import sys
import datetime
print('Aktuální datum:' , datetime.datetime.now().date())
print('Python:' , sys.version)
import numpy as np
print('Numpy:', np.__version__)
import pandas as pd
print('Pandas:', pd.__version__)
import vhat
print('VHAT:', vhat.__version__)
import numba
print('Numba:', numba.__version__)
import matplotlib
print('Matplotlib:', matplotlib.__version__)
import seaborn as sns
print('Seaborn:', sns.__version__)
#import bokeh as bk
#import bokeh.plotting as bkplot
#print('Bokeh:', bk.__version__)
#bkplot.output_notebook() # show visualisation inline
In [2]:
SQUARE = 128
SQUARE_MULTIPLIER = 1.5
# how many
BARS_BACK_TO_REFERENCE = np.int(np.ceil(SQUARE * SQUARE_MULTIPLIER))
# set higher timeframe for getting SquareMathLevels
MINUTES = 30 # range 0-59
PD_RESAMPLE_RULE = f'{MINUTES}Min'
# set the period of PD_RESAMPLE_RULE will be started. E.g. PD_RESAMPLE_RULE == '30min':
# PD_GROUPER_BASE = 5, periods will be: 8:05:00, 8:35:00, 9:05:00, etc...
# PD_GROUPER_BASE = 0, means 8:00:00, 8:30:00, 9:00:00, etc...
PD_GROUPER_BASE = 0
In [3]:
TICK_SIZE_STR = f'{1/32*0.5}'
TICK_SIZE = float(TICK_SIZE_STR)
#SYMBOL = 'ZN'
TICK_SIZE_STR
Out[3]:
In [4]:
DATA_FILE = '../../Data/ZN-1s.csv'
read_cols = ['Date', 'Time', 'Open', 'High', 'Low', 'Last']
data = pd.read_csv(DATA_FILE, index_col=0, skipinitialspace=True, usecols=read_cols, parse_dates={'Datetime': [0, 1]})
data.rename(columns={"Last": "Close"}, inplace=True)
data.index.name = 'Datetime'
data['Idx'] = np.arange(data.shape[0])
df = data
df
Out[4]:
High
In [5]:
# calculate max high for actual record from higher tiframe his period
df_helper_gr = df[['High']].groupby(pd.Grouper(freq=PD_RESAMPLE_RULE, base=PD_GROUPER_BASE))
df_helper = df_helper_gr.rolling(PD_RESAMPLE_RULE, min_periods=1).max().dropna() # cummax() with new index
df_helper['bigCumMaxHigh'] = df_helper.assign(l=df_helper_gr.max().dropna().rolling(BARS_BACK_TO_REFERENCE-1).max().shift().loc[df_helper.index.get_level_values(0)].to_numpy()).max(axis=1, skipna=False)
df_helper.set_index(df_helper.index.get_level_values(1), inplace=True) # drop multiindex
df['SMLHighLimit'] = df_helper.bigCumMaxHigh
df
Out[5]:
Low
In [6]:
# calculate min low for actual record from higher tiframe his period
df_helper_gr = df[['Low']].groupby(pd.Grouper(freq=PD_RESAMPLE_RULE, base=PD_GROUPER_BASE))
df_helper = df_helper_gr.rolling(PD_RESAMPLE_RULE, min_periods=1).min().dropna() # cummin() with new index
df_helper['bigCumMinLow'] = df_helper.assign(l=df_helper_gr.min().dropna().rolling(BARS_BACK_TO_REFERENCE-1).min().shift().loc[df_helper.index.get_level_values(0)].to_numpy()).min(axis=1, skipna=False)
df_helper.set_index(df_helper.index.get_level_values(1), inplace=True) # drop multiindex
df['SMLLowLimit'] = df_helper.bigCumMinLow
df
Out[6]:
Zahození nepotřebných prostředků a záznamů NaN, které nemůžu analyzovat
In [7]:
del df_helper
del df_helper_gr
df.dropna(inplace=True)
df
Out[7]:
In [8]:
from vhat.squaremath.funcs import calculate_octave
SML_INDEXES = np.arange(-2, 10+1, dtype=np.int) # from -2/8 to +2/8
def round_to_tick_size(values, tick_size):
return np.round(values / tick_size) * tick_size
def get_smlines(r):
tick_size = TICK_SIZE
lowLimit = r.SMLLowLimit
highLimit = r.SMLHighLimit
zeroLine, frameSize = calculate_octave(lowLimit, highLimit)
spread = frameSize * 0.125
sml = SML_INDEXES * spread + zeroLine
sml = round_to_tick_size(sml, tick_size)
return [sml, zeroLine, frameSize, spread]
In [9]:
temp = df.apply(get_smlines, axis=1, result_type='expand')
temp.columns = ['SML', 'zeroLine', 'framSize', 'spread']
df = df.join(temp)
del temp
df
Out[9]:
Musím vypočítat dotyk předchozího průrazu kvůli frame-shift.
In [10]:
df['prevSML'] = df.SML.shift()
df.dropna(inplace=True)
df
Out[10]:
In [11]:
df['SMLTouch'] = df.apply(lambda r: np.bitwise_and(r.Low<=r.prevSML, r.prevSML<=r.High), axis=1)
df['SMLTouchCount'] = df.SMLTouch.apply(lambda v: sum(v))
df
Out[11]:
In [12]:
from dataclasses import dataclass
from typing import List
@dataclass
class Trade:
tId: int
# vstupní data, která znám dopředu
entry_idx: int
entry_sml_number: int
entry_sml_spread: float
entry_price: float
entry_lots: int # -1 short, 1 long
profit_target: float
stop_loss: float
# průběh a vývoj trhu v otevřeném obchodu
max_running_profit_price: float
max_running_loss_price: float
# výstupní data, která se vyplní až na konci
exit_idx: int = -1
exit_price: float = 0.0
exit_sml_number: int = 9999
# pokud obchod skončí tak, že nebude možné zjistit výsledek, co bylo realizováno dříve, nastaví se tahle proměnná
unrecognizable_trade: bool = False
@dataclass
class TradeList:
trades: List[Trade]
In [13]:
def check_open_trades(v, finished_trades, r):
# TODO: dodělat indexy aktuální svíce
trades_to_close = []
for tid, trade in opened_trades.items():
# průběžné statistiky
if trade.entry_lots == 0 : raise Exception('Něco jsem dojebal - open trades má entry lots == 0')
long_trade = trade.entry_lots>0
if long_trade:
trade.max_running_profit_price = max(min(r.High, trade.profit_target), trade.max_running_profit_price)
trade.max_running_loss_price = min(max(r.Low, trade.stop_loss), trade.max_running_loss_price)
else: # short trade
trade.max_running_profit_price = min(max(r.Low, trade.profit_target), trade.max_running_profit_price)
trade.max_running_loss_price = max(min(r.High, trade.stop_loss), trade.max_running_loss_price)
# zasažení PT nebo SL
hit_pt = True if r.Low<=trade.profit_target<=r.High else False
hit_sl = True if r.Low<=trade.stop_loss<=r.High else False
hits = (hit_pt, hit_sl)
if all(hits):
# špatný stav - nedokážu přesně určit, zda obchod trefil první SL nebo PT
trade.unrecognizable_trade = True
trades_to_close.append(tid)
elif hit_pt:
trade.exit_idx = r.Idx
trade.exit_price = trade.profit_target
trade.exit_sml_number = trade.entry_sml_number+(1 if long_trade else -1)
trades_to_close.append(tid)
elif hit_sl:
trade.exit_idx = r.Idx
trade.exit_price = trade.stop_loss
trade.exit_sml_number = trade.entry_sml_number-(1 if long_trade else -1)
trades_to_close.append(tid)
# Uzavření tradů
for tid in trades_to_close:
finished_trades.append(opened_trades[tid])
del opened_trades[tid]
In [14]:
def entry_logic(opened_trades, finished_trades, r, prev_r, last_level, tick_size, rr_multiplier=1):
if r.SMLTouchCount !=1:
# TODO: tohle neni az tak uplne pravda
# pokud je open pod oběma proraženými levely, je jasné, že levely byly
# aktivovány v jasném pořadí, ale to asi není až tak důležité.
return # nejde urcit, co bylo aktivováno dříve
# zjistit, který level je aktivován => musí být z minulých levelů
price_level_hit = r.prevSML[r.SMLTouch][0]
for trade in opened_trades.values():
if trade.entry_price == price_level_hit:
return # zadny obchod nechci otevirat, uz je otevren
newtid = len(opened_trades) + len(finished_trades) + 1
idx_level_hit = SML_INDEXES[r.SMLTouch][0]
# otevrit obchod na prorazenem levelu
if price_level_hit < last_level:
# dotek z vrchu == long
lots = 1
pt = round_to_tick_size(price_level_hit + prev_r.spread * rr_multiplier, tick_size)
sl = round_to_tick_size(price_level_hit - prev_r.spread, tick_size)
running_profit_price = r.High
running_loss_price = r.Low
else:
# short
lots = -1
pt = round_to_tick_size(price_level_hit - prev_r.spread * rr_multiplier, tick_size)
sl = round_to_tick_size(price_level_hit + prev_r.spread, tick_size)
running_profit_price = r.Low
running_loss_price = r.High
new_trade = Trade(newtid, r.Idx, idx_level_hit, prev_r.spread, price_level_hit, lots, pt, sl, running_profit_price, running_loss_price)
opened_trades[newtid] = new_trade
last_level = None # price of last SML for predicting
opened_trades = {}
finished_trades = []
for idxdt, r in df.iterrows():
if not last_level:
last_level = r.Close
prev_r = r
continue
check_open_trades(opened_trades, finished_trades, r)
entry_logic(opened_trades, finished_trades, r, prev_r, last_level, TICK_SIZE)
# nastavit poslední vývoj pro kalkulaci v další svíci
prev_r = r
if r.SMLTouchCount == 1:
last_level = r.prevSML[r.SMLTouch][0]
elif r.SMLTouchCount > 1:
last_level = r.Close
In [15]:
from dataclasses import astuple
finished_trades = TradeList(finished_trades)
opened_trades = TradeList(list(opened_trades.values()))
cols = ['id', 'entryIdx', 'entrySmLvl', 'entrySmlSpread', 'entryPrice', 'lots', 'pt', 'sl', 'runningProfit', 'runningRisk', 'exitIdx', 'exitPrice', 'exitSmLvl', 'unrecognizableTrade']
stats_opened = pd.DataFrame(astuple(opened_trades)[0], columns=cols)
stats = pd.DataFrame(astuple(finished_trades)[0], columns=cols)
stats
Out[15]:
Backtest základní info
In [16]:
print('Od:', df.iloc[0].name)
print('Do', df.iloc[-1].name)
print('Časové období:', df.iloc[-1].name - df.iloc[0].name)
print('Počet obchodních dnů:', df.Close.resample('1D').ohlc().shape[0])
print('Počet záznamů jemného tf:', df.shape[0])
Zjištění, zda je zvolený SQUARE na vyšším timeframu dostatečný pro backtest na tomto nízkém timeframu. Tzn. pokud mám Square=32 z vyššího timeframe='30min', mohu zjistit jestli jsou záznamy timeframe='1min' vhodné pro backtest.
Pokud by byla vysoká chyba rozlišení nízkého timeframe (např. nad 5%), je třeba pro relevatní výsledky zvolit buď nižší rozlišení pro backtest např. '30s' příp. '1s', nebo zvýšit SQUARE=64 nebo zvýšit vysoký timeframe pro výpočet SML `1h, 2h, 4h, 8h, 1d, ...'.
In [17]:
touchCounts = df.SMLTouchCount.value_counts().to_frame(name='Occurences')
touchCounts['Occ%'] = touchCounts / df.shape[0]*100
print(f'Počet protnutích více něž jedné SML v jednom záznamu: v {(df.SMLTouchCount>1).sum()} případech ({(df.SMLTouchCount>1).sum()/df.shape[0]*100:.3f}%) z {df.shape[0]} celkem\n')
touchCounts
Out[17]:
In [18]:
spread_stats = df.spread.value_counts().to_frame(name='Occurences')
spread_stats['Occ%'] = spread_stats / df.shape[0]*100
spread_stats['Ticks'] = spread_stats.index / TICK_SIZE # index musím
print(f'Počet spredu SML menších než 2 ticky v jednom záznamu: v {(df.spread/TICK_SIZE<2).sum()} případech ({(df.spread/TICK_SIZE<2).sum()/df.shape[0]*100:.3f}%) z {df.shape[0]} celkem\n')
spread_stats
Out[18]:
In [19]:
chybovost = df.spread[(df.spread/TICK_SIZE<2) | (df.SMLTouchCount>1)].shape[0]
print(f'Celková chybovost v nízkém timeframe může být v {chybovost} případech ({chybovost/df.shape[0]*100:.3f}%) z {df.shape[0]} celkem')
In [20]:
finishedCount = stats.shape[0]
print('Total finished trades:', finishedCount)
# pokud je opravdu hodně "unrecognizableTrade", mám moc nízké rozlišení SquareMath levels (malý square)
unrec_trades = stats.unrecognizableTrade.sum()
print('Unrecognizable trades:', unrec_trades, f'({unrec_trades/finishedCount *100:.3f}%)')
print('Opened trades:', stats_opened.shape[0])
Dál nebudu potřebovat unrecognized trades
In [21]:
stats.drop(stats[stats.unrecognizableTrade].index, inplace=True)
In [22]:
shorts_mask = stats.lots<0
longs_mask = stats.lots>0
stats.loc[shorts_mask, 'PnL'] = ((stats[shorts_mask].entryPrice - stats[shorts_mask].exitPrice) / TICK_SIZE).round()
stats.loc[longs_mask, 'PnL'] = ((stats[longs_mask].exitPrice - stats[longs_mask].entryPrice) / TICK_SIZE).round()
stats.PnL = stats.PnL.astype(int)
stats['runPTicks'] = ((stats.entryPrice - stats.runningProfit).abs() / TICK_SIZE).round().astype(int)
stats['runLTicks'] = ((stats.entryPrice - stats.runningRisk).abs() * -1 / TICK_SIZE).round().astype(int)
stats['ptTicks'] = ((stats.entryPrice - stats.pt).abs() / TICK_SIZE).round().astype(int)
stats['slTicks'] = ((stats.entryPrice - stats.sl).abs() * -1 / TICK_SIZE).round().astype(int)
stats['tradeTime'] = stats.exitIdx - stats.entryIdx
stats
Out[22]:
In [23]:
# masks
shorts_mask = stats.lots<0
longs_mask = stats.lots>0
profit_mask = stats.PnL>0
loss_mask = stats.PnL<0
breakeven_mask = stats.PnL==0
total_trades = stats.shape[0]
profit_trades_count = stats.PnL[profit_mask].shape[0]
loss_trades_count = stats.PnL[loss_mask].shape[0]
breakeven_trades_count = stats.PnL[breakeven_mask].shape[0]
print(f'Ziskových obchodů {profit_trades_count}({profit_trades_count/total_trades*100:.2f}%) z {total_trades} celkem')
print(f'Ztrátových obchodů {loss_trades_count}({loss_trades_count/total_trades*100:.2f}%) z {total_trades} celkem')
print(f'Break-even obchodů {breakeven_trades_count}({breakeven_trades_count/total_trades*100:.2f}%) z {total_trades} celkem')
print('---')
print(f'Počet Long obchodů = {stats[longs_mask].shape[0]} ({stats[longs_mask].shape[0]/stats.shape[0]*100:.2f}%) z {total_trades} celkem')
print(f'Počet Short obchodů = {stats[shorts_mask].shape[0]} ({stats[shorts_mask].shape[0]/stats.shape[0]*100:.2f}%) z {total_trades} celkem')
print('---')
print(f'Suma zisků = {stats.PnL[profit_mask].sum()} Ticks')
print(f'Suma ztrát = {stats.PnL[loss_mask].sum()} Ticks')
print(f'Celkem = {stats.PnL.sum()} Ticks')
In [24]:
selected_stats = stats[loss_mask]
selected_pnl_stats = selected_stats.PnL.value_counts().to_frame(name='PnLOccurences')
selected_pnl_stats['Occ%'] = selected_pnl_stats / selected_stats.shape[0]*100
selected_pnl_stats['Ticks'] = selected_pnl_stats.index / TICK_SIZE
selected_pnl_stats
Out[24]:
In [25]:
sns.distplot(selected_stats.runPTicks, color="g");
Poměrově pohyb v zisku k nastavenému PT u ztrátových obchodů.
In [26]:
sns.distplot(selected_stats.runPTicks/selected_stats.ptTicks, color="g");
In [27]:
sns.distplot(selected_stats.runLTicks, color="r");
In [28]:
sns.distplot(selected_stats.runLTicks/selected_stats.slTicks, color="r");
In [29]:
selected_stats = stats[profit_mask]
selected_pnl_stats = selected_stats.PnL.value_counts().to_frame(name='PnLOccurences')
selected_pnl_stats['Occ%'] = selected_pnl_stats / selected_stats.shape[0]*100
selected_pnl_stats['Ticks'] = selected_pnl_stats.index / TICK_SIZE
selected_pnl_stats
Out[29]:
In [30]:
sns.distplot(selected_stats.runPTicks, color="g");
Poměrově pohyb v zisku k PT u ziskových obchodů.
In [31]:
sns.distplot(selected_stats.runPTicks/selected_stats.ptTicks, color="g");
In [32]:
sns.distplot(selected_stats.runLTicks, color="r");
poměr vývoje ztráty k zadanému SL v ziskových obchodech
In [33]:
sns.distplot(selected_stats.runLTicks/selected_stats.slTicks, color="r");
In [34]:
selected_stats = stats[longs_mask]
print('Počet obchodů:', selected_stats.shape[0], f'({selected_stats.shape[0]/stats.shape[0]*100:.2f}%) z {stats.shape[0]}')
print('Počet win:', selected_stats[selected_stats.PnL>0].shape[0], f'({selected_stats[selected_stats.PnL>0].shape[0]/selected_stats.shape[0]*100:.2f}%) z {selected_stats.shape[0]}')
print('Počet loss:', selected_stats[selected_stats.PnL<0].shape[0], f'({selected_stats[selected_stats.PnL<0].shape[0]/selected_stats.shape[0]*100:.2f}%) z {selected_stats.shape[0]}')
print('Počet break-even:', selected_stats[selected_stats.PnL==0].shape[0], f'({selected_stats[selected_stats.PnL==0].shape[0]/selected_stats.shape[0]*100:.2f}%) z {selected_stats.shape[0]}')
print('---')
print(f'Průměrný zisk: {selected_stats.PnL[selected_stats.PnL>0].mean():.3f}')
print(f'Průměrná ztráta: {selected_stats.PnL[selected_stats.PnL<0].mean():.3f}')
print('---')
selected_pnl_stats = selected_stats.PnL.value_counts().to_frame(name='PnLOccurences')
selected_pnl_stats['Occ%'] = selected_pnl_stats / selected_stats.shape[0]*100
selected_pnl_stats['Ticks'] = selected_pnl_stats.index / TICK_SIZE
selected_pnl_stats
Out[34]:
In [35]:
sns.distplot(selected_stats[selected_stats.PnL<0].runPTicks, color="g");
Poměrově pohyb v zisku k nastavenému PT u ztrátových obchodů.
In [36]:
sns.distplot(selected_stats[selected_stats.PnL<0].runPTicks/selected_stats[selected_stats.PnL<0].ptTicks, color="g");
In [37]:
sns.distplot(selected_stats[selected_stats.PnL<0].runLTicks, color="r");
In [38]:
sns.distplot(selected_stats[selected_stats.PnL<0].runLTicks/selected_stats[selected_stats.PnL<0].slTicks, color="r"); # kontrola
In [39]:
sns.distplot(selected_stats[selected_stats.PnL>0].runPTicks, color="g");
Poměrově pohyb v zisku k nastavenému PT u ziskových obchodů.
In [40]:
sns.distplot(selected_stats[selected_stats.PnL>0].runPTicks/selected_stats[selected_stats.PnL>0].ptTicks, color="g");
In [41]:
sns.distplot(selected_stats[selected_stats.PnL>0].runLTicks, color="r");
In [42]:
sns.distplot(selected_stats[selected_stats.PnL>0].runLTicks/selected_stats[selected_stats.PnL>0].slTicks, color="r"); # kontrola
In [43]:
#smlvl_stats = stats.entrySmLvl.value_counts().to_frame(name='entrySmLvlOcc')
smlvl_stats = stats[['entrySmLvl', 'lots']].groupby(['entrySmLvl']).count()
smlvl_stats.sort_values(by='lots', ascending=False, inplace=True)
smlvl_stats.rename(columns={'lots':'entrySmLvlOcc'}, inplace=True)
smlvl_stats['Occ%'] = smlvl_stats.entrySmLvlOcc / stats.shape[0] * 100
print(f'Vstup do obchodu z nejčastějších 3 levelů: {smlvl_stats.iloc[:3].index.to_list()} {smlvl_stats["Occ%"].iloc[:3].sum():.2f}%')
print(f'Vstup do obchodu z nejčastějších 5 levelů: {smlvl_stats.iloc[:5].index.to_list()} {smlvl_stats["Occ%"].iloc[:5].sum():.2f}%')
print('---')
print(f'Vstup do obchodu z nejčastějších 7 levelů: {smlvl_stats.iloc[:7].index.to_list()} {smlvl_stats["Occ%"].iloc[:7].sum():.2f}%')
print(f'Vstup do obchodu z nejčastějších 9 levelů: {smlvl_stats.iloc[:9].index.to_list()} {smlvl_stats["Occ%"].iloc[:9].sum():.2f}%')
print(f'Vstup do obchodu z nejčastějších 11 levelů: {smlvl_stats.iloc[:11].index.to_list()} {smlvl_stats["Occ%"].iloc[:11].sum():.2f}%')
print('---')
smlvl_stats
Out[43]:
In [44]:
sns.barplot(x=smlvl_stats.entrySmLvlOcc.sort_index().index, y=smlvl_stats.entrySmLvlOcc.sort_index());
In [45]:
stats.lots.replace({1: 'Long', -1: 'Short'}, inplace=True)
In [46]:
smlvl_stats_buy_sell = stats[['entrySmLvl', 'PnL', 'lots']].groupby(['entrySmLvl', 'lots']).count()
smlvl_stats_buy_sell.sort_index(ascending=False, inplace=True)
smlvl_stats_buy_sell.rename(columns={'PnL':'LongShortCount'}, inplace=True)
smlvl_stats_buy_sell
smlvl_stats_buy_sell['LongShortTotal%'] = smlvl_stats_buy_sell.LongShortCount / smlvl_stats_buy_sell.LongShortCount.sum() *100
smlvl_stats_buy_sell['SMLlongOrShort%'] = smlvl_stats_buy_sell[['LongShortCount']].groupby(level=0).apply(lambda x: 100 * x / float(x.sum()))
smlvl_stats_buy_sell
Out[46]:
In [47]:
stats['Win']=profit_mask
In [48]:
stats['Win'] = stats['Win'].mask(~profit_mask) # groupby bude počítat jen výhry
smlvl_stats_buy_sell['WinCount'] = stats[['entrySmLvl', 'PnL', 'lots', 'Win']].groupby(['entrySmLvl', 'lots', 'Win']).count().droplevel(2)
smlvl_stats_buy_sell['Win%'] = smlvl_stats_buy_sell.WinCount / smlvl_stats_buy_sell.LongShortCount * 100
smlvl_stats_buy_sell
Out[48]:
Jen pro kontrolu. Win == True, Loss == False
In [49]:
# stats['Win'] = profit_mask
# smlvl_stats_buy_sell2 = stats[['entrySmLvl', 'PnL', 'lots', 'Win']].groupby(['entrySmLvl', 'lots', 'Win']).sum()
# smlvl_stats_buy_sell2.sort_index(ascending=False, inplace=True)
# smlvl_stats_buy_sell2.rename(columns={'PnL':'WinLossCount'}, inplace=True)
# smlvl_stats_buy_sell2
Seřazeny výsledky dle úspěsnosti:
In [50]:
smlvl_stats_buy_sell.sort_values('Win%', ascending=False)
Out[50]:
In [ ]: