In [1]:
from zipfile import ZipFile
from datetime import datetime
from pytz import timezone
import numpy as np
from numpy.lib import recfunctions
import tables as tb
In [2]:
fname = '../local_data/EQY_US_ALL_BBO_20140206.zip'
fields of dailyquotes file taqquote
[0:8]HHMMSSXXX
[9] text EXCHANGE N Nyse T/Q NASDAQ
[10:25] text symbol 6+10
[26:36] bid price 7+4
[37:43] bid size (units)
[44:54] ask price 7+4
[55:61] ask size
[62] text Condition of quote
[63:66] market maker
[67] bid exchange
[68] ask aexchange
[69:84] int seqno
[85] int bbo indicator
[86] int NASDAQ BBO indocator
[87] text cancel/correction
[88] text C=CTA N=UTP
[90] text Retail interest indicator
[...]
# Two characters are also used at the end of each line as a line indicator
widths = [9, 1, 16, 11, 7, 11, 7, 1, 4, 1, 1, 16, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
w = np.array(widths)
w.cumsum()
Note that we'll have 98 bytes total with the \r\n on the end.
In [3]:
BYTES_PER_LINE = 98
# Note - we're using object here (as pandas will do this anyway),
# and we'll need to convert back to fixed width strings later
# We can get the widths from the widths list above
old_dtype = [('Time', np.datetime64),
('Exchange', object), # |S1
('Symbol', object), # |S16, etc.
('Bid_Price', np.float64),
('Bid_Size', np.int32),
('Ask_Price', np.float64),
('Ask_Size', np.int32),
('Quote_Condition', object),
('Market_Maker', np.int), # This is blank - want to skip?
('Bid_Exchange', object),
('Ask_Exchange', object),
('Sequence_Number', np.int64),
('National_BBO_Ind', np.int8), # These aren't really numbers
('National_BBO_Ind', np.int8), # Maybe should be string?
('Quote_Cancel_Correction', object),
('Source_of_Quote', object),
('Retail_Interest_Indicator_RPI', object),
('Short_Sale_Restriction_Indicator', object),
('LULD_BBO_Indicator_CQS', object),
('LULD_BBO_Indicator_UTP', object),
('FINRA_ADF_MPID_Indicator', object),
('SIP_generated_Message_Identifier', object),
('National_BBO_LULD_Indicator', object)
] # Then there's two characters for newline
# This was for pandas' screwball approach to dtype
# names = [a for a,b in dtype]
# dtype = dict(dtype)
In [4]:
# Note that the '|' character means byte order doesn't matter,
# which it never will for "bytes" (which is what 'S' stands for)
initial_dtype = [('Time', 'S9'), # HHMMSSmmm, should be in Eastern Time (ET)
# ('hour', '|S2'),
# ('minute', '|S2'),
# ('second', '|S2'),
# ('msec', '|S3'),
('Exchange', 'S1'),
('Symbol', 'S16'), # Maybe should split into 6 root + 10 suffix
('Bid_Price', 'S11'), # 7.4 (fixed point)
('Bid_Size', 'S7'),
('Ask_Price', 'S11'), # 7.4
('Ask_Size', 'S7'),
('Quote_Condition', 'S1'),
('Market_Maker', 'S4'), # This ends up getting discarded, it should always be b' '
('Bid_Exchange', 'S1'),
('Ask_Exchange', 'S1'),
('Sequence_Number', 'S16'),
('National_BBO_Ind', 'S1'),
('NASDAQ_BBO_Ind', 'S1'),
('Quote_Cancel_Correction', 'S1'),
('Source_of_Quote', 'S1'),
('Retail_Interest_Indicator_RPI', 'S1'),
('Short_Sale_Restriction_Indicator', 'S1'),
('LULD_BBO_Indicator_CQS', 'S1'),
('LULD_BBO_Indicator_UTP', 'S1'),
('FINRA_ADF_MPID_Indicator', 'S1'),
('SIP_generated_Message_Identifier', 'S1'),
('National_BBO_LULD_Indicator', 'S1'),
('newline', 'S2')]
In [5]:
# Justin and Pandas (I think) use time64, as does PyTables.
# We could use msec from beginning of day for now in an int16
# (maybe compare performance to datetime64? But dates should compress very well...)
time_col = 'Time'
convert_dtype = [
('Bid_Price', np.float64),
('Bid_Size', np.int32),
('Ask_Price', np.float64),
('Ask_Size', np.int32),
# ('Market_Maker', np.int8), # This is not currently used, and should always be b' '
('Sequence_Number', np.int64),
# ('National_BBO_Ind', np.int8), # The _Ind fields are actually categorical - leaving as strings
# ('NASDAQ_BBO_Ind', np.int8),
]
passthrough_strings = ['Exchange',
'Symbol',
'Quote_Condition',
'Bid_Exchange',
'Ask_Exchange',
'National_BBO_Ind', # The _Ind fields are actually categorical - leaving as strings
'NASDAQ_BBO_Ind',
'Quote_Cancel_Correction',
'Source_of_Quote',
'Retail_Interest_Indicator_RPI',
'Short_Sale_Restriction_Indicator',
'LULD_BBO_Indicator_CQS',
'LULD_BBO_Indicator_UTP',
'FINRA_ADF_MPID_Indicator',
'SIP_generated_Message_Identifier',
'National_BBO_LULD_Indicator']
In [6]:
# Lifted from blaze.pytables
def dtype_to_pytables(dtype):
""" Convert NumPy dtype to PyTable descriptor
Examples
--------
>>> from tables import Int32Col, StringCol, Time64Col
>>> dt = np.dtype([('name', 'S7'), ('amount', 'i4'), ('time', 'M8[us]')])
>>> dtype_to_pytables(dt) # doctest: +SKIP
{'amount': Int32Col(shape=(), dflt=0, pos=1),
'name': StringCol(itemsize=7, shape=(), dflt='', pos=0),
'time': Time64Col(shape=(), dflt=0.0, pos=2)}
"""
d = {}
for pos, name in enumerate(dtype.names):
dt, _ = dtype.fields[name]
if issubclass(dt.type, np.datetime64):
tdtype = tb.Description({name: tb.Time64Col(pos=pos)}),
else:
tdtype = tb.descr_from_dtype(np.dtype([(name, dt)]))
el = tdtype[0] # removed dependency on toolz -DJC
getattr(el, name)._v_pos = pos
d.update(el._v_colobjects)
return d
In [7]:
# The "easy" dtypes are the "not datetime" dtypes
easy_dtype = []
convert_dict = dict(convert_dtype)
for name, dtype in initial_dtype:
if name in convert_dict:
easy_dtype.append( (name, convert_dict[name]) )
elif name in passthrough_strings:
easy_dtype.append( (name, dtype) )
# PyTables will not accept np.datetime64, we hack below, but we use it to work with the blaze
# function above.
# We also shift Time to the end (while I'd rather maintain order), as it's more efficient for Dav
# given the technical debt he's already built up.
pytables_dtype = easy_dtype + [('Time', 'datetime64[ms]')]
pytables_desc = dtype_to_pytables(np.dtype(pytables_dtype))
In [8]:
class TAQ2HDF5:
def __init__(self, taq_fname):
self.taq_fname = taq_fname
def convert_taq(self):
# The below doesn't work for pandas (and neither does `unzip` from the command line). Probably want to use
# something like `7z x -so my_file.zip 2> /dev/null` if we use pandas.
with ZipFile(self.taq_fname) as zfile:
for inside_f in zfile.filelist:
# The original filename is available as inside_f.filename
with zfile.open(inside_f.filename) as infile:
first = infile.readline()
# You need to use bytes to split bytes
dateish, numlines = first.split(b":")
numlines = int(numlines)
# Get dates to combine with times later
# This is a little over-trusting of the spec...
self.month = int(dateish[2:4])
self.day = int(dateish[4:6])
self.year = int(dateish[6:10])
# Should I use a context manager here?
h5_table = self.setup_hdf5(inside_f.filename, numlines)
try:
self.raw_conversion(numlines, infile, h5_table)
finally:
self.finalize_hdf5()
def setup_hdf5(self, h5_fname_root, numlines):
# We're using aggressive compression and checksums, since this will likely stick around
# Stopping one level short of max compression - don't be greedy.
self.h5 = tb.open_file(h5_fname_root + '.h5', title=h5_fname_root, mode='w',
filters=tb.Filters(complevel=8, complib='blosc:lz4hc', fletcher32=True) )
return self.h5.create_table('/', 'daily_quotes', description=pytables_desc, expectedrows=numlines)
def finalize_hdf5(self):
self.h5.close()
def process_chunk(self, all_strings):
# This is unnecessary copying
easy_converted = all_strings.astype(easy_dtype)
# These don't have the decimal point in the TAQ file
for dollar_col in ['Bid_Price', 'Ask_Price']:
easy_converted[dollar_col] /= 10000
# Currently, there doesn't seem to be any utility to converting to numpy.datetime64
# PyTables wants float64's corresponding to the POSIX Standar (relative to 1970-01-01, UTC)
converted_time = [datetime( self.year, self.month, self.day,
int(raw[:2]), int(raw[2:4]), int(raw[4:6]),
int(raw[6:9]) * 1000, # msec needs to be microsec
tzinfo=timezone('US/Eastern') ).timestamp()
for raw in all_strings['Time'] ]
# More unnecessary copying
records = recfunctions.append_fields(easy_converted, 'Time', converted_time, usemask=False)
return records
# at some point, we might optimize chunksize. For now, assume PyTables is smart
def raw_conversion(self, numlines, infile, out, chunksize=None):
'''Read raw bytes from TAQ, write to HDF5'''
index = 0
if chunksize is None:
chunksize = out.chunkshape[0]
while(True):
raw_bytes = infile.read(BYTES_PER_LINE * chunksize)
## Break after 10 lines
index = index + 1
if index == 10:
break
if not raw_bytes:
break
# If we use asarray with this dtype, it crashes Python! (might not be true anymore)
# ndarray gives 'S' arrays instead of chararrays (as recarray does)
all_strings = np.ndarray(chunksize, buffer=raw_bytes, dtype=initial_dtype)
# This approach doesn't work...
# out[chunk_start:chunk_stop, 1:] = all_strings[:,1:-1]
out.append( self.process_chunk(all_strings) )
In [10]:
i = 0
with ZipFile(fname) as zfile:
for inside_f in zfile.filelist:
with zfile.open(inside_f.filename) as infile:
first = infile.readline() # I dont want to print the first line
while i < 5:
first = infile.readline()
i = i + 1
dateish = first.split()
print(dateish) # split
#print(dateish[0]) # the first chunck node
hour = dateish[0][0:2]
month = dateish[0][2:4]
second = dateish[0][4:6]
mesc = dateish[0][6:9]
print(month, hour, ":", second, mesc)
In [ ]:
class read:
def __init__(self, taq_fname):
self.taq_fname = taq_fname
def convert_taq(self):
# The below doesn't work for pandas (and neither does `unzip` from the command line). Probably want to use
# something like `7z x -so my_file.zip 2> /dev/null` if we use pandas
i = 0
with ZipFile(self.taq_fname) as zfile:
for inside_f in zfile.filelist:
with zfile.open(inside_f.filename) as infile:
first = infile.readline() # I dont want to print the first line
while i < 1:
first = infile.readline()
i = i + 1
dateish = first.split()
print(dateish) # split
hour = dateish[0][0:2]
month = dateish[0][2:4]
second = dateish[0][4:6]
mesc = dateish[0][6:9]
print(hour, month, second, mesc)
# Should I use a context manager here?
h5_table = self.setup_hdf5(inside_f.filename, numlines)
try:
self.raw_conversion(numlines, infile, h5_table)
finally:
self.finalize_hdf5()
def setup_hdf5(self, h5_fname_root, numlines):
# We're using aggressive compression and checksums, since this will likely stick around
# Stopping one level short of max compression - don't be greedy.
self.h5 = tb.open_file(h5_fname_root + '.h5', title=h5_fname_root, mode='w',
filters=tb.Filters(complevel=8, complib='blosc:lz4hc', fletcher32=True) )
return self.h5.create_table('/', 'daily_quotes', description=pytables_desc, expectedrows=numlines)
def finalize_hdf5(self):
self.h5.close()
def process_chunk(self, all_strings):
# This is unnecessary copying
easy_converted = all_strings.astype(easy_dtype)
# These don't have the decimal point in the TAQ file
for dollar_col in ['Bid_Price', 'Ask_Price']:
easy_converted[dollar_col] /= 10000
# Currently, there doesn't seem to be any utility to converting to numpy.datetime64
# PyTables wants float64's corresponding to the POSIX Standar (relative to 1970-01-01, UTC)
converted_time = [datetime( self.year, self.month, self.day,
int(raw[:2]), int(raw[2:4]), int(raw[4:6]),
int(raw[6:9]) * 1000, # msec needs to be microsec
tzinfo=timezone('US/Eastern') ).timestamp()
for raw in all_strings['Time'] ]
# More unnecessary copying
records = recfunctions.append_fields(easy_converted, 'Time', converted_time, usemask=False)
return records
# at some point, we might optimize chunksize. For now, assume PyTables is smart
def raw_conversion(self, numlines, infile, out, chunksize=None):
'''Read raw bytes from TAQ, write to HDF5'''
index = 0
if chunksize is None:
chunksize = out.chunkshape[0]
while(True):
raw_bytes = infile.read(BYTES_PER_LINE * chunksize)
if not raw_bytes:
break
# If we use asarray with this dtype, it crashes Python! (might not be true anymore)
# ndarray gives 'S' arrays instead of chararrays (as recarray does)
all_strings = np.ndarray(chunksize, buffer=raw_bytes, dtype=initial_dtype)
# This approach doesn't work...
# out[chunk_start:chunk_stop, 1:] = all_strings[:,1:-1]
out.append( self.process_chunk(all_strings) )
In [10]:
test_run = TAQ2HDF5(fname)
In [11]:
%time test_run.convert_taq()
In [12]:
h5 = tb.open_file('./taqquote20140206.h5')
In [16]:
h5.root.daily_quotes[:]
Out[16]:
In [17]:
h5.close()
In [24]:
Out[24]:
In [ ]: