In [1]:
"""
The connector for ZeroMG
This is the connector which will be the core message queue within the same machine
__author__ = "Alex Xiao <http://www.alexxiao.me/>"
__date__ = "2017-04-22"
__version__ = "0.2"
Version:
0.1 : implemented zeroMQ publish/ subscribe & server/client mode
0.2 : added device for publish/ subscribe mode
"""
import zmq
import sys
import time
class device():
'''
The device whcih exchanges messages between pub/sub
'''
def __init__(self,port_pub=12116,port_sub=12117,debug=False):
self.port_pub=port_pub
self.port_sub=port_sub
self.socket=None
self.context=None
self.debug=debug
def start(self):
#try:
context = zmq.Context()
# Socket facing clients
if self.debug: print('Got context')
frontend = context.socket(zmq.SUB)
addr="tcp://*:"+str(self.port_pub)
if self.debug: print('Tring to bind PUB at '+addr)
frontend.bind(addr)
frontend.setsockopt(zmq.SUBSCRIBE, b"")
if self.debug: print('Waiting for PUB at '+addr)
# Socket facing services
backend = context.socket(zmq.PUB)
if self.debug: print('Tring to bind SUB at '+addr)
addr="tcp://*:"+str(self.port_sub)
backend.bind(addr)
if self.debug: print('Waiting for SUB at '+addr)
zmq.device(zmq.FORWARDER, frontend, backend)
#except Exception as e:
# global ee
# ee=e
# print(e)
# print("bringing down zmq device")
#finally:
# pass
# frontend.close()
# backend.close()
# context.term()
#device(debug=True).start()
class publisher():
def __init__(self,port=12116,mode='device',debug=False):
"""
The publisher, this will not auto connect, please run connect after
Parameters:
port: the port publisher/device is on
mode: default to 'device' whcih use intermediary device
otherwise bind to port
"""
self.port=port
self.mode=mode
self.socket=None
self.context=None
self.debug=debug
def connect(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
if self.mode=='device':
self.socket.connect("tcp://localhost:%d" % self.port)
else:
self.socket.bind("tcp://localhost:%d" % self.port)
def disconnect(self):
self.socket.close()
def pub(self,topic,msg):
if self.debug: print ('DEBUG: Publishing [',topic,'] ', msg)
self.socket.send_string("%s %s" % (topic, msg))
class subscriber():
def __init__(self,topic,port=12117,debug=False,code='UTF-8'):
"""
The subscriber, this will not auto connect, please run connect after
Parameters:
port: the port publisher is on
"""
self.CODE=code
self.port=port
self.topic=topic
self.socket=None
self.context=None
self.DEBUG=debug
def connect(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect ("tcp://localhost:%d" % self.port)
topicfilter = self.topic.encode(self.CODE)
self.socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
def disconnect(self):
self.socket.close()
def sub(self):
string = self.socket.recv().decode(self.CODE)
if self.DEBUG:
print ('DEBUG: Received raw message:',string)
topic, messagedata = string.split(' ',1)
if self.DEBUG:
print ('DEBUG: Received [',topic,'] ', messagedata)
return messagedata
class server():
def __init__(self,port=12116,debug=False):
"""
The publisher, this will not auto connect, please run connect after
Parameters:
port: the port publisher is on
"""
self.port=port
self.socket=None
self.context=None
self.debug=debug
def connect(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind("tcp://localhost:%d" % self.port)
def disconnect(self):
self.socket.close()
def receive(self):
msg=self.socket.recv().decode()
if self.debug: print ('DEBUG: Received Msg: ', msg)
return msg
def send(self,msg):
if self.debug: print ('DEBUG: Sending Msg: ', msg)
self.socket.send_string(msg)
class client():
def __init__(self,port,debug=False):
"""
The client
Parameters:
port: the port server is running on
"""
self.port=port
self.socket=None
self.context=None
self.debug=debug
def connect(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect ("tcp://localhost:%s" % self.port)
def disconnect(self):
self.socket.close()
def receive(self):
msg=self.socket.recv().decode()
if self.debug: print ('DEBUG: Received Msg: ', msg)
return msg
def send(self,msg):
if self.debug: print ('DEBUG: Sending Msg: ', msg)
self.socket.send_string(msg)
def request(self,msg):
if self.debug: print ('DEBUG: Sending Msg: ', msg)
self.socket.send_string(msg)
msg=self.socket.recv().decode()
if self.debug: print ('DEBUG: Received Msg: ', msg)
return msg
device(debug=True).start()