by Jerry He, 2016.12, 讨论:https://zhuanlan.zhihu.com/p/24662087
近来,量化交易平台vnpy因其开源、功能强大、开发容易、可定制性强的特点,目前已经被广泛应用在量化交易中。 行情数据落地是量化交易平台必须解决的一个基础问题,它有两个方面的作用:一是供策略开发时进行分析、回测;二是为实盘程序时提供近期的历史数据。前者可以通过传统效率更高的实现方式(比如我们有基于C++和leveldb实现的行情数据接收、转发、历史数据获取程序)实现,也可以通过向数据提供方购买获取。但是对于后者,直接基于vnpy落地近期的数据是更为简易的方式。
vnpy包含行情落地模块dataRecorder,已经实现了tick数据、分钟bar数据保存功能。 本工作主要包括:
以下所有性能测试时间单位均为毫秒。
测试基于windows 7, i7 3.4GHz.
In [1]:
from datetime import datetime, time
import time as gtime
import pymongo
from dateutil.parser import parse
In [2]:
TICK_DB_NAME='Test'
EMPTY_STRING = ''
EMPTY_UNICODE = u''
EMPTY_INT = 0
EMPTY_FLOAT = 0.0
class DrTickData(object):
"""Tick数据"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.vtSymbol = EMPTY_STRING # vt系统代码
self.symbol = EMPTY_STRING # 合约代码
self.exchange = EMPTY_STRING # 交易所代码
# 成交数据
self.lastPrice = EMPTY_FLOAT # 最新成交价
self.volume = EMPTY_INT # 最新成交量
self.openInterest = EMPTY_INT # 持仓量
self.upperLimit = EMPTY_FLOAT # 涨停价
self.lowerLimit = EMPTY_FLOAT # 跌停价
# tick的时间
self.date = EMPTY_STRING # 日期
self.time = EMPTY_STRING # 时间
self.datetime = None # python的datetime时间对象
# 五档行情
self.bidPrice1 = EMPTY_FLOAT
self.bidPrice2 = EMPTY_FLOAT
self.bidPrice3 = EMPTY_FLOAT
self.bidPrice4 = EMPTY_FLOAT
self.bidPrice5 = EMPTY_FLOAT
self.askPrice1 = EMPTY_FLOAT
self.askPrice2 = EMPTY_FLOAT
self.askPrice3 = EMPTY_FLOAT
self.askPrice4 = EMPTY_FLOAT
self.askPrice5 = EMPTY_FLOAT
self.bidVolume1 = EMPTY_INT
self.bidVolume2 = EMPTY_INT
self.bidVolume3 = EMPTY_INT
self.bidVolume4 = EMPTY_INT
self.bidVolume5 = EMPTY_INT
self.askVolume1 = EMPTY_INT
self.askVolume2 = EMPTY_INT
self.askVolume3 = EMPTY_INT
self.askVolume4 = EMPTY_INT
self.askVolume5 = EMPTY_INT
def insertData(db,collection,data):
client[db][collection].insert(data.__dict__)
def procecssTickEvent(tick, insertDB=False):
"""处理行情推送"""
vtSymbol = tick.vtSymbol
# 转化Tick格式
drTick = DrTickData()
d = drTick.__dict__
for key in d.keys():
if key != 'datetime':
d[key] = tick.__dict__[key]
drTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')
# 更新Tick数据
if insertDB:
insertData(TICK_DB_NAME, vtSymbol, drTick)
In [3]:
client=pymongo.MongoClient()
data=client['VnTrader_Tick_Db']['rb1705'].find_one({})
del data['_id']
class InputTick: pass
tick=InputTick()
tick.__dict__.update(data)
print tick.__dict__
In [4]:
def profiling(count,func=None):
if func==None: func=lambda: procecssTickEvent(tick)
t0=gtime.time()
for i in range(count):
func()
total_time=(gtime.time()-t0)
return total_time*1000/count
test_count=10000
In [5]:
original_nodb=profiling(test_count)
client.drop_database(TICK_DB_NAME)
original_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))
print '原版不保存数据到mongodb单次耗时:%.4f' %original_nodb
print '原版含保存数据到mongodb单次耗时:%.4f' %original_db
In [6]:
#过滤掉的时间区间,注意集合竞价tick被过滤了。
invalid_sections=[(time(2,30,59),time(9,0,0)),
(time(11,30,59),time(13,0,0)),
(time(15,15,0),time(21,0,0))]
#本地时间在此区间时对收到的Tick数据不处理,避免有时期货公司会抽风把数据重推一次。
invalid_local_section=(time(5,0,0),time(8,30,0))
def procecssTickEvent(tick, insertDB=False):
"""处理行情推送"""
# 1. 本地时间检查
local_datetime=datetime.now()
local_time=local_datetime.time()
if local_time>invalid_local_section[0] and local_time<invalid_local_section[1]:
return
# 2. 转化Tick格式
drTick = DrTickData()
d = drTick.__dict__
for key in d.keys():
if key != 'datetime':
d[key] = tick.__dict__[key]
#防御时间格式变为 ”9:00:00.5"
if tick.time[2] != ':':
tick.time = '0' + tick.time
tick_hour = int(tick.time[0:2])
local_hour = local_time.hour
real_date=local_datetime
if tick_hour == 23 and local_hour == 0:#行情时间慢于系统时间
real_date+=timedelta(-1)
elif tick_hour == 0 and local_hour == 23:#系统时间慢于行情时间
real_date+=timedelta(1)
tick.time = tick.time.ljust(12,'0')
drTick.datetime = datetime(real_date.year,real_date.month,real_date.day,
int(tick.time[0:2]), int(tick.time[3:5]), int(tick.time[6:8]), int(tick.time[9:12])*1000)
tmpTime=drTick.datetime.time()
for sec in invalid_sections:
if tmpTime>sec[0] and tmpTime<sec[1]:
return
# 3. 更新Tick数据
if insertDB:
insertData(TICK_DB_NAME, tick.vtSymbol, drTick)
procecssTickEvent(tick)
In [7]:
new_nodb=profiling(test_count)
client.drop_database(TICK_DB_NAME)
new_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))
print '新版不保存数据到mongodb单次耗时:%.4f' %original_nodb
print '新版含保存数据到mongodb单次耗时:%.4f' %original_db
In [8]:
def insertData(db,collection,data):
for key in data.__dict__:
fout.write(str(data.__dict__[key])+',')
In [9]:
fout=open('D:/test.txt','w')
new_db_text=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))
print '新版含保存数据到text file单次耗时:%.4f' %original_db
fout.close()
In [10]:
time_convert1=profiling(10000,lambda:parse('20161212 21:21:21.5'))
time_convert2=profiling(10000,lambda:datetime.strptime('20161212 21:21:21.5', '%Y%m%d %H:%M:%S.%f'))
def customized_parse(s):
s=s.ljust(21,'0')
return datetime(int(s[0:4]),int(s[4:6]),int(s[6:8]),int(s[9:11]), int(s[12:14]), int(s[15:17]), int(s[18:21])*1000 )
time_convert3=profiling(10000,lambda:customized_parse('20161212 21:21:21.5'))
print '转化方法1耗时:%.4f' %time_convert1
print '转化方法2耗时:%.4f' %time_convert2
print '转化方法3耗时:%.4f' %time_convert3
In [11]:
import pandas as pd
df=pd.DataFrame([{u'无数据写入':original_nodb,u'mongodb写入':original_db},
{u'无数据写入': new_nodb, u'mongodb写入': new_db, u'text文件写入':new_db_text}
],index=['原版','新版'])
df
Out[11]:
总的来看,行情数据落地原版与新版一次落地耗时都为0.2ms左右。函数中,耗时主要来源于mongodb的插入,占约为90%的耗时。通过尝试简单的text写入作为数据存储方式,耗时得到了大幅降低,获得了单次0.04ms耗时的效果,采取其它更高效的格式有望进一步降低耗时。但考虑到无数据写入时的耗时为约0.02ms,所以期望的最优耗时也就在0.02ms左右。
总的来说,基于mongodb的方案能够实时存储的条目数在每秒几百条量级;进一步优化可能达到几千条量级。此水平应该己能满足绝大多数的需求。