vnpy接收行情数据性能测试与改进优化

by Jerry He, 2016.12, 讨论:https://zhuanlan.zhihu.com/p/24662087

近来,量化交易平台vnpy因其开源、功能强大、开发容易、可定制性强的特点,目前已经被广泛应用在量化交易中。 行情数据落地是量化交易平台必须解决的一个基础问题,它有两个方面的作用:一是供策略开发时进行分析、回测;二是为实盘程序时提供近期的历史数据。前者可以通过传统效率更高的实现方式(比如我们有基于C++和leveldb实现的行情数据接收、转发、历史数据获取程序)实现,也可以通过向数据提供方购买获取。但是对于后者,直接基于vnpy落地近期的数据是更为简易的方式。

vnpy包含行情落地模块dataRecorder,已经实现了tick数据、分钟bar数据保存功能。 本工作主要包括:

  • vnpy原落地函数的性能考查
  • 针对CTP接口,原落地函数的修正与优化

以下所有性能测试时间单位均为毫秒。

测试基于windows 7, i7 3.4GHz.


In [1]:
from datetime import datetime, time
import time as gtime
import pymongo
from dateutil.parser import parse

重构vnpy接收行情数据代码,以用于测试


In [2]:
TICK_DB_NAME='Test'

EMPTY_STRING = ''
EMPTY_UNICODE = u''
EMPTY_INT = 0
EMPTY_FLOAT = 0.0

class DrTickData(object):
    """Tick数据"""

    #----------------------------------------------------------------------
    def __init__(self):
        """Constructor"""       
        self.vtSymbol = EMPTY_STRING            # vt系统代码
        self.symbol = EMPTY_STRING              # 合约代码
        self.exchange = EMPTY_STRING            # 交易所代码

        # 成交数据
        self.lastPrice = EMPTY_FLOAT            # 最新成交价
        self.volume = EMPTY_INT                 # 最新成交量
        self.openInterest = EMPTY_INT           # 持仓量
        
        self.upperLimit = EMPTY_FLOAT           # 涨停价
        self.lowerLimit = EMPTY_FLOAT           # 跌停价
        
        # tick的时间
        self.date = EMPTY_STRING            # 日期
        self.time = EMPTY_STRING            # 时间
        self.datetime = None                # python的datetime时间对象
        
        # 五档行情
        self.bidPrice1 = EMPTY_FLOAT
        self.bidPrice2 = EMPTY_FLOAT
        self.bidPrice3 = EMPTY_FLOAT
        self.bidPrice4 = EMPTY_FLOAT
        self.bidPrice5 = EMPTY_FLOAT
        
        self.askPrice1 = EMPTY_FLOAT
        self.askPrice2 = EMPTY_FLOAT
        self.askPrice3 = EMPTY_FLOAT
        self.askPrice4 = EMPTY_FLOAT
        self.askPrice5 = EMPTY_FLOAT        
        
        self.bidVolume1 = EMPTY_INT
        self.bidVolume2 = EMPTY_INT
        self.bidVolume3 = EMPTY_INT
        self.bidVolume4 = EMPTY_INT
        self.bidVolume5 = EMPTY_INT
        
        self.askVolume1 = EMPTY_INT
        self.askVolume2 = EMPTY_INT
        self.askVolume3 = EMPTY_INT
        self.askVolume4 = EMPTY_INT
        self.askVolume5 = EMPTY_INT    
        
def insertData(db,collection,data):
    client[db][collection].insert(data.__dict__)

def procecssTickEvent(tick, insertDB=False):
    """处理行情推送"""
    vtSymbol = tick.vtSymbol

    # 转化Tick格式
    drTick = DrTickData()
    d = drTick.__dict__
    for key in d.keys():
        if key != 'datetime':
            d[key] = tick.__dict__[key]
    drTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')  
    
    # 更新Tick数据
    if insertDB:
        insertData(TICK_DB_NAME, vtSymbol, drTick)

创建一个用于测试的Tick数据


In [3]:
client=pymongo.MongoClient()
data=client['VnTrader_Tick_Db']['rb1705'].find_one({})
del data['_id']

class InputTick: pass
tick=InputTick()
tick.__dict__.update(data)
print tick.__dict__


{u'askPrice2': 0.0, u'lastPrice': 2977.0, u'exchange': u'UNKNOWN', u'bidVolume5': 0, u'bidVolume4': 0, u'bidVolume3': 0, u'bidVolume2': 0, u'bidVolume1': 1551, u'datetime': datetime.datetime(2016, 12, 28, 21, 27, 36, 500000), u'askVolume1': 120, u'askVolume3': 0, u'askVolume2': 0, u'askVolume5': 0, u'askVolume4': 0, u'date': u'20161228', u'askPrice5': 0.0, u'volume': 392068, u'lowerLimit': 2725.0, u'bidPrice5': 0.0, u'bidPrice4': 0.0, u'bidPrice1': 2976.0, u'bidPrice3': 0.0, u'bidPrice2': 0.0, u'vtSymbol': u'rb1705', u'time': u'21:27:36.5', u'openInterest': 2304294.0, u'askPrice4': 0.0, u'askPrice3': 0.0, u'symbol': u'rb1705', u'askPrice1': 2977.0, u'upperLimit': 3136.0}

测试原版函数性能


In [4]:
def profiling(count,func=None):
    if func==None: func=lambda: procecssTickEvent(tick)
    t0=gtime.time()
    for i in range(count):
        func()
    total_time=(gtime.time()-t0)
    return total_time*1000/count

test_count=10000

In [5]:
original_nodb=profiling(test_count)
client.drop_database(TICK_DB_NAME)
original_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))
print '原版不保存数据到mongodb单次耗时:%.4f' %original_nodb
print '原版含保存数据到mongodb单次耗时:%.4f' %original_db


原版不保存数据到mongodb单次耗时:0.0255
原版含保存数据到mongodb单次耗时:0.2334

改进版本

原版程序使用CTP接口保存期货数据时,存在几个问题:

  • 非交易时间收到的野数据没有被过滤掉
  • 当前各交易所提供的date字段混乱,有的使用真实日期,有的使用交易日,导致计算的datetime字段也是有问题的

针对以上问题的改进版本如下:


In [6]:
#过滤掉的时间区间,注意集合竞价tick被过滤了。
invalid_sections=[(time(2,30,59),time(9,0,0)),
          (time(11,30,59),time(13,0,0)),
          (time(15,15,0),time(21,0,0))]

#本地时间在此区间时对收到的Tick数据不处理,避免有时期货公司会抽风把数据重推一次。
invalid_local_section=(time(5,0,0),time(8,30,0))

def procecssTickEvent(tick, insertDB=False):
    """处理行情推送"""
    # 1. 本地时间检查
    local_datetime=datetime.now()
    local_time=local_datetime.time()
    if local_time>invalid_local_section[0] and local_time<invalid_local_section[1]:
        return

    # 2. 转化Tick格式
    drTick = DrTickData()
    d = drTick.__dict__
    for key in d.keys():
        if key != 'datetime':
            d[key] = tick.__dict__[key]

    #防御时间格式变为 ”9:00:00.5"
    if tick.time[2] != ':': 
        tick.time = '0' + tick.time
        
    tick_hour = int(tick.time[0:2])  
    local_hour = local_time.hour
    real_date=local_datetime
    if tick_hour == 23 and local_hour == 0:#行情时间慢于系统时间
        real_date+=timedelta(-1)
    elif tick_hour == 0 and local_hour == 23:#系统时间慢于行情时间
        real_date+=timedelta(1)

    tick.time = tick.time.ljust(12,'0')
    drTick.datetime = datetime(real_date.year,real_date.month,real_date.day,
        int(tick.time[0:2]), int(tick.time[3:5]), int(tick.time[6:8]), int(tick.time[9:12])*1000)

    tmpTime=drTick.datetime.time()
    for sec in invalid_sections:
        if tmpTime>sec[0] and tmpTime<sec[1]:
            return
    
    # 3. 更新Tick数据
    if insertDB:
        insertData(TICK_DB_NAME, tick.vtSymbol, drTick)        

procecssTickEvent(tick)

In [7]:
new_nodb=profiling(test_count)
client.drop_database(TICK_DB_NAME)
new_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))
print '新版不保存数据到mongodb单次耗时:%.4f' %original_nodb
print '新版含保存数据到mongodb单次耗时:%.4f' %original_db


新版不保存数据到mongodb单次耗时:0.0255
新版含保存数据到mongodb单次耗时:0.2334

保存为文本文件效率


In [8]:
def insertData(db,collection,data):
    for key in data.__dict__:
        fout.write(str(data.__dict__[key])+',')

In [9]:
fout=open('D:/test.txt','w')
new_db_text=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))
print '新版含保存数据到text file单次耗时:%.4f' %original_db
fout.close()


新版含保存数据到text file单次耗时:0.2334

时间类型转化效率

注意到不保存数据到数据的版本中,新版相比老版耗时反而降低了,这主要是由于时间转化函数的改写。

如下三种时间转化方法效率差别巨大:


In [10]:
time_convert1=profiling(10000,lambda:parse('20161212 21:21:21.5'))
time_convert2=profiling(10000,lambda:datetime.strptime('20161212 21:21:21.5', '%Y%m%d %H:%M:%S.%f'))
def customized_parse(s):
    s=s.ljust(21,'0')
    return datetime(int(s[0:4]),int(s[4:6]),int(s[6:8]),int(s[9:11]), int(s[12:14]), int(s[15:17]), int(s[18:21])*1000 )
time_convert3=profiling(10000,lambda:customized_parse('20161212 21:21:21.5'))      
print '转化方法1耗时:%.4f' %time_convert1
print '转化方法2耗时:%.4f' %time_convert2
print '转化方法3耗时:%.4f' %time_convert3


转化方法1耗时:0.0560
转化方法2耗时:0.0122
转化方法3耗时:0.0032

总结


In [11]:
import pandas as pd
df=pd.DataFrame([{u'无数据写入':original_nodb,u'mongodb写入':original_db},
                 {u'无数据写入': new_nodb, u'mongodb写入': new_db, u'text文件写入':new_db_text}
                ],index=['原版','新版'])
df


Out[11]:
mongodb写入 text文件写入 无数据写入
原版 0.2334 NaN 0.0255
新版 0.2174 0.0362 0.0160

总的来看,行情数据落地原版与新版一次落地耗时都为0.2ms左右。函数中,耗时主要来源于mongodb的插入,占约为90%的耗时。通过尝试简单的text写入作为数据存储方式,耗时得到了大幅降低,获得了单次0.04ms耗时的效果,采取其它更高效的格式有望进一步降低耗时。但考虑到无数据写入时的耗时为约0.02ms,所以期望的最优耗时也就在0.02ms左右。

总的来说,基于mongodb的方案能够实时存储的条目数在每秒几百条量级;进一步优化可能达到几千条量级。此水平应该己能满足绝大多数的需求。