Sending Messages


In [1]:
import mflow
import json
stream = mflow.connect('tcp://*:9999', conn_type=mflow.BIND, mode=mflow.PUSH)

In [14]:
stream.send(json.dumps({"htype": "mytype"}).encode('utf-8'), send_more=True)  # Main header
stream.send(json.dumps({"YOU": "suck"}).encode('utf-8'))  # Main header

In [6]:
stream.disconnect()

Merging of Two Streams

Note: this is not synchroning streams


In [1]:
import mflow

In [2]:
stream_one = mflow.connect('tcp://sf-lc:7777')
stream_two = mflow.connect('tcp://sf-lc:7779')

In [3]:
def dump(receiver):
    header = receiver.next()
    while receiver.has_more():
        message = receiver.next()
        print(message)
    return header

In [4]:
import mflow.tools
stream = mflow.tools.Merge(stream_one, stream_two)

In [11]:
m = stream.receive(handler=dump, block=False)
print(m.data)


0
b'{"channels":[{"encoding":"little","name":"SIMI-FAKEDATA:TEST_0","shape":[1],"type":"float64"},{"encoding":"little","name":"SIMI-FAKEDATA:TEST_1","shape":[1],"type":"float64"},{"encoding":"little","name":"SIMI-FAKEDATA:TEST_2","shape":[1],"type":"float64"},{"encoding":"little","name":"SIMI-FAKEDATA:TEST_3","shape":[1],"type":"float64"},{"encoding":"little","name":"SIMI-FAKEDATA:TEST_4","shape":[1],"type":"float64"},{"encoding":"little","name":"SIMI-FAKEDATA:TEST_5","shape":[1],"type":"float64"},{"encoding":"little","name":"SIMI-FAKEDATA:TEST_6","shape":[1],"type":"float64"},{"encoding":"little","name":"SIMI-FAKEDATA:TEST_7","shape":[1],"type":"float64"},{"encoding":"little","name":"SIMI-FAKEDATA:TEST_8","shape":[1],"type":"float64"},{"encoding":"little","name":"SIMI-FAKEDATA:TEST_9","shape":[1],"type":"float64"}],"htype":"bsr_d-1.1"}\n'
b'\x00\x00\x00\x00\xc0?\xd8@'
b'\xf5\xf9\xbbW\x00\x00\x00\x00\xa7\xe7U\x16\x00\x00\x00\x00'
b'\x00\x00\x00\x00\xc0?\xd8@'
b'\xf5\xf9\xbbW\x00\x00\x00\x00T\xebU\x16\x00\x00\x00\x00'
b'\x00\x00\x00\x00\xc0?\xd8@'
b'\xf5\xf9\xbbW\x00\x00\x00\x00\x00\xeeU\x16\x00\x00\x00\x00'
b'\x00\x00\x00\x00\xc0?\xd8@'
b'\xf5\xf9\xbbW\x00\x00\x00\x00\x88\xf0U\x16\x00\x00\x00\x00'
b'\x00\x00\x00\x00\xc0?\xd8@'
b'\xf5\xf9\xbbW\x00\x00\x00\x00\xa6\xf2U\x16\x00\x00\x00\x00'
b'\x00\x00\x00\x00\xc0?\xd8@'
b'\xf5\xf9\xbbW\x00\x00\x00\x00\x11\xf5U\x16\x00\x00\x00\x00'
b'\x00\x00\x00\x00\xc0?\xd8@'
b'\xf5\xf9\xbbW\x00\x00\x00\x00b\xf7U\x16\x00\x00\x00\x00'
b'\x00\x00\x00\x00\xc0?\xd8@'
b'\xf5\xf9\xbbW\x00\x00\x00\x00\xd9\xf9U\x16\x00\x00\x00\x00'
b'\x00\x00\x00\x00\xc0?\xd8@'
b'\xf5\xf9\xbbW\x00\x00\x00\x00\x16\xfcU\x16\x00\x00\x00\x00'
b'\x00\x00\x00\x00\xc0?\xd8@'
b'\xf5\xf9\xbbW\x00\x00\x00\x00\xcf\xfeU\x16\x00\x00\x00\x00'
b'{"global_timestamp":{"ns":840785013,"sec":1471937013},"hash":"9d5bc4b55b79c49d66b3147b3ca7cf30","htype":"bsr_m-1.1","pulse_id":248305}\n'