In [1]:
import zmq
import json
import requests
import pandas as pd
import numpy as np
from io import StringIO
class UserAgent:
# HTTP - localhost : 8739 はなさく
WEBAPI_ADDR = "http://localhost:8739"
# ZMQ - localhost : 8740 はなしを
ZMQPUB_ADDR = "tcp://localhost:8740"
#
def __init__( self, topic ):
self.context = zmq.Context()
self.topic = topic
#
def subscribe_zmq(self):
self.publisher = self.context.socket( zmq.SUB )
self.publisher.setsockopt( zmq.SUBSCRIBE, self.topic )
self.publisher.connect( self.ZMQPUB_ADDR )
#
@classmethod
def get_server_health(cls):
r = requests.get( cls.WEBAPI_ADDR + "/health" )
return r.json()
#
@classmethod
def get_server_version(cls):
r = requests.get( cls.WEBAPI_ADDR + "/version" )
return r.json()
#
@classmethod
def get_portfolio(cls):
r = requests.get( cls.WEBAPI_ADDR + "/portfolio" )
df = pd.read_json( r.content, orient='records' )
return df.set_index('code')
#
@classmethod
def get_history_csv( cls, code ):
r = requests.get( f'{cls.WEBAPI_ADDR}/stocks/history/csv/?code={code}' )
return r.content.decode( 'utf-8' )
#
@classmethod
def get_history_web( cls, code ):
r = requests.get( f'{cls.WEBAPI_ADDR}/stocks/history/js/?code={code}' )
df = pd.read_json( r.content, orient='records' )
return df.set_index('at')
#
@classmethod
def req_history_zmq( cls, code ):
r = requests.get( f'{cls.WEBAPI_ADDR}/publish?code={code}' )
return r
#
def read_pub_zmq( self ):
vs = []
while True:
[topic, bytestr] = self.publisher.recv_multipart()
contents = bytestr.decode( 'utf-8' )
v = pd.read_json( contents, orient='records' )
if len( v ) > 0:
# 送られてきたリストをvsに追加する
vs.append( v )
else:
# リストの長さが0つまり"[]"が送られてきたら
# 送るべき情報が無いという意味なので、それを確認したら終了する
break;
return topic, pd.concat(vs)
In [2]:
# すべて購読する
topic = b''
useragent = UserAgent( topic )
useragent.subscribe_zmq()
In [3]:
version = useragent.get_server_version()
version
Out[3]:
In [4]:
healthy = useragent.get_server_health()
healthy
Out[4]:
In [5]:
print( f"{healthy['hNumProcessors']} processors" )
In [6]:
allocated_mega_bytes = healthy['hStats']['allocated_bytes'] / 1000 / 1000
print( f"memory {allocated_mega_bytes} megabytes in use" )
In [7]:
useragent.get_portfolio()
Out[7]:
In [8]:
# Yahoo Financeのようなクエリパラメーターでね
code = '8411.T'
In [9]:
csv = useragent.get_history_csv( code )
csv
Out[9]:
改行コードCRLFのCRを消してLFにしてからpandas.read_csvで読む
In [10]:
sio = StringIO( csv.replace('\r', '') )
ohlcvC = pd.read_csv( sio ).loc[:,['at','code','open','high','low','close','volume']].set_index('at')
ohlcvC
Out[10]:
In [11]:
json = useragent.get_history_web( code )
json
Out[11]:
情報の整理
In [12]:
ohlcvW = json.loc[:,['code','open','high','low','close','volume']]
ohlcvW
Out[12]:
リクエストをサーバーに送る
In [13]:
useragent.req_history_zmq( code )
Out[13]:
HTTPで返す情報が無いので204 NoContentが返ってくる。
In [14]:
topic, df = useragent.read_pub_zmq()
ZeroMQのPUB / SUB通信でトピックとpandas DataFrameを得る。
このメソッドは通信終了までブロックする。
In [15]:
topic
Out[15]:
In [16]:
df
Out[16]:
同じく情報の整理
In [17]:
ohlcvZ = df.loc[:,['at','code','open','high','low','close','volume']].set_index('at')
ohlcvZ
Out[17]:
In [18]:
del useragent
In [19]:
run_threads = 10
In [20]:
from concurrent.futures import ThreadPoolExecutor, as_completed
def ccjob(number):
topic = b''
ua = UserAgent( topic )
ua.subscribe_zmq()
topic, df = ua.read_pub_zmq()
df = df.loc[:,['at','code','open','high','low','close','volume']].set_index('at')
return df
ohlcvThreads = []
with ThreadPoolExecutor() as executor:
ohlcvThreads = executor.map( ccjob, range(run_threads) )
ここで停止する(ブロッキング関数をよんでいるから)ので
以下のリンクをクリックして出版依頼APIを呼ぶ
http://localhost:8739/publish/?code=8411.T
このページの内容はありません。(204 NoContent)
In [21]:
import hashlib
対象リスト
In [22]:
df_under_test = [ohlcvC, ohlcvW, ohlcvZ] + list(ohlcvThreads)
DataFrameをmd5チェックサムにする
In [23]:
def checksum( x ):
return hashlib.md5( x.to_msgpack() ).hexdigest()
dut = [checksum(x) for x in df_under_test]
dut
Out[23]:
先頭要素と全要素の一致を確認する
In [24]:
cond = [x == dut[0] for x in dut ]
cond
Out[24]:
すべてTrueであること
In [25]:
c = all(cond)
assert c, "!"
c
Out[25]:
すべて同じリクエストだから同じ情報が得られて当然でした。
In [ ]: