0MQ:
Covering all possible use cases and underlying technologies would be something long and deep. So this slide will guide through the development of a paor of simple examples.
The goal is create two programs:
In [1]:
%%writefile echo.py
#!/usr/bin/env python
import zmq
from zmq.eventloop import ioloop, zmqstream
def echo(stream, message):
stream.send_multipart(message)
io_loop = ioloop.IOLoop()
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
stream = zmqstream.ZMQStream(socket, io_loop=io_loop)
stream.on_recv_stream(echo)
socket.bind('tcp://0.0.0.0:11235')
io_loop.start()
In [2]:
%%writefile client.py
#!/usr/bin/env python
import zmq
context = zmq.Context()
s = context.socket(zmq.DEALER)
s.connect('tcp://127.0.0.1:11235')
for i in range(10):
s.send('Hello world')
print(s.recv())
In a terminal, run python echo.py
, and:
In [3]:
%%bash
python client.py
In a very similar way, now, the client generates a list of integers within certain range, and sends each number to the server. The server reads the integer, and checks if there is any coincidence in the Fibonacci sequence.
In [4]:
%%writefile fibo.py
#!/usr/bin/env python
import zmq
from zmq.eventloop import ioloop, zmqstream
def fibonacci(n):
a, b = 0, 1
while n >= a:
yield(a)
a, b = b, a + b
return
def fibo(stream, message):
n = int(message[1])
reply = [message[0]] + [str(n in [x for x in fibonacci(n)])]
stream.send_multipart(reply)
io_loop = ioloop.IOLoop()
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
stream = zmqstream.ZMQStream(socket, io_loop=io_loop)
stream.on_recv_stream(fibo)
socket.bind('tcp://0.0.0.0:11235')
io_loop.start()
In [5]:
%%writefile nacci.py
#!/usr/bin/env python
import zmq
from random import randint
context = zmq.Context()
s = context.socket(zmq.DEALER)
s.connect('tcp://127.0.0.1:11235')
for i in range(15):
n = randint(0,200)
s.send(str(n))
print(str(n) + ' ' + s.recv())
In a terminal, run python fibo.py
, and then:
In [6]:
%%bash
python nacci.py
In this session, using what reviewed in those two examples, let's create three different kinds of nodes:
The Broker is the only single instance node of the exercise, holding a simple list of IP addresses and ports where others are listening. All other processes in the system know the address of the Broker previously. It will accept connections at port 9999, in the facilitator's computer, accepting just two different commands:
REGISTER [<ip>:<port>]
: When receiving this command, the Broker will put the pair IP-port into the internal list, unless the pair IP-port already exists, then, it will reply HOP
, without adding it.LIST
: When receiving this command, the Broker will send the list as a string separating each <ip>:<port>
with a single space.When running the processes, it's quite recommendable to run quite some Hiders, let's say 10, and some less Seekers, let's say 2 or 3. Except for the communication with the broker, which has been defined to provide the broker already created, the rest of the protocols should be defined by the teams.
In [7]:
%%writefile broker.py
#!/usr/bin/env python
import argparse
import zmq
from zmq.eventloop import ioloop, zmqstream
clients = set()
def action_register(message):
address = message.split()[1].strip()
if address in clients:
return 'HOP'
else:
clients.add(address)
return 'OK'
def action_list(message):
return ' '.join(clients)
def handle(stream, message):
addr, text = message
print('BROKER: ' + text)
action = text.split()[0].lower()
try:
reply = globals()['action_' + action](text)
except KeyError:
print('BROKER: Unknown action', action)
reply = 'ERROR'
stream.send_multipart((addr, reply))
io_loop = ioloop.IOLoop()
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
stream = zmqstream.ZMQStream(socket, io_loop=io_loop)
stream.on_recv_stream(handle)
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--bind-address', default='tcp://0.0.0.0:5555')
if __name__ == '__main__':
args = parser.parse_args()
socket.bind(args.bind_address)
io_loop.start()
In [8]:
%%writefile cities.txt
Barcelona
Berlin
Madrid
New York
Londres
Igualada
In [9]:
%%writefile hider.py
#!/usr/bin/env python
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', default='5556')
parser.add_argument('-b', '--broker', default='tcp://127.0.0.1:5555')
args = parser.parse_args()
In [10]:
%%writefile seeker.py
#!/usr/bin/env python
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--broker', default='tcp://127.0.0.1:5555')
In [ ]: