Introduction to pyzmq

0MQ:

  • Is a library which enables communication and concurrent synchronization between processes.
  • Wraps TCP sockets providing an enhanced communication interface to exchange messages.
  • Can connect N-to-N components, with patterns like fan-out, pub-sub, or others.
  • The components can communicate over different transports like in-process, inter-process, TCP or multicast.
  • It's model provides of asynchronous I/O.

Covering all possible use cases and underlying technologies would be something long and deep. So this slide will guide through the development of a paor of simple examples.

Echo example

The goal is create two programs:

  • one will send a string message to the other one,
  • which will, simply, send it back.

Echo server


In [1]:
%%writefile echo.py
#!/usr/bin/env python
import zmq
from zmq.eventloop import ioloop, zmqstream

def echo(stream, message):
    stream.send_multipart(message)

io_loop = ioloop.IOLoop()
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
stream = zmqstream.ZMQStream(socket, io_loop=io_loop)
stream.on_recv_stream(echo)
socket.bind('tcp://0.0.0.0:11235')
io_loop.start()


Writing echo.py

Echo client


In [2]:
%%writefile client.py
#!/usr/bin/env python
import zmq

context = zmq.Context()

s = context.socket(zmq.DEALER)
s.connect('tcp://127.0.0.1:11235')
for i in range(10):
    s.send('Hello world')
    print(s.recv())


Writing client.py

Usage

In a terminal, run python echo.py, and:


In [3]:
%%bash
python client.py


Hello world
Hello world
Hello world
Hello world
Hello world
Hello world
Hello world
Hello world
Hello world
Hello world

Fibonacci example

In a very similar way, now, the client generates a list of integers within certain range, and sends each number to the server. The server reads the integer, and checks if there is any coincidence in the Fibonacci sequence.

Fibo server


In [4]:
%%writefile fibo.py
#!/usr/bin/env python
import zmq
from zmq.eventloop import ioloop, zmqstream

def fibonacci(n):
    a, b = 0, 1
    while n >= a:
        yield(a)
        a, b = b, a + b
    return

def fibo(stream, message):
    n = int(message[1])
    reply = [message[0]] + [str(n in [x for x in fibonacci(n)])]
    stream.send_multipart(reply)

io_loop = ioloop.IOLoop()
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
stream = zmqstream.ZMQStream(socket, io_loop=io_loop)
stream.on_recv_stream(fibo)
socket.bind('tcp://0.0.0.0:11235')
io_loop.start()


Writing fibo.py

Nacci client


In [5]:
%%writefile nacci.py
#!/usr/bin/env python
import zmq
from random import randint

context = zmq.Context()

s = context.socket(zmq.DEALER)
s.connect('tcp://127.0.0.1:11235')
for i in range(15):
    n = randint(0,200)
    s.send(str(n))
    print(str(n) + ' ' + s.recv())


Writing nacci.py

Usage

In a terminal, run python fibo.py, and then:


In [6]:
%%bash
python nacci.py


39 False
182 False
34 True
152 False
148 False
12 False
53 False
154 False
119 False
198 False
29 False
10 False
85 False
99 False
103 False

Exercise

In this session, using what reviewed in those two examples, let's create three different kinds of nodes:

  • The Broker is the only single instance node of the exercise, holding a simple list of IP addresses and ports where others are listening. All other processes in the system know the address of the Broker previously. It will accept connections at port 9999, in the facilitator's computer, accepting just two different commands:

    • REGISTER [<ip>:<port>] : When receiving this command, the Broker will put the pair IP-port into the internal list, unless the pair IP-port already exists, then, it will reply HOP, without adding it.
    • LIST : When receiving this command, the Broker will send the list as a string separating each <ip>:<port> with a single space.
  • The Hider is the program with more instances running at the same time. When launched, it will connect to the Broker, and will register its current IP and port, then will disconnect, will randomly choose the name of a city from a list, and will accept connections in that IP and port. It will only accept one single command, including the name of the city, and if the city name provided is the one chosen, it will reply the guess was correct, and then will choose another random city name, and will accept connections again.
  • The Seeker is a program which will connect to Broker, and ask for the list of Hiders, and then, will go connecting to each one of the Hiders, asking a possible city. It will store a counter of how much hiders it found and will print in stdout everytime it tries to guess the city for a hider.

When running the processes, it's quite recommendable to run quite some Hiders, let's say 10, and some less Seekers, let's say 2 or 3. Except for the communication with the broker, which has been defined to provide the broker already created, the rest of the protocols should be defined by the teams.


In [7]:
%%writefile broker.py
#!/usr/bin/env python
import argparse
import zmq
from zmq.eventloop import ioloop, zmqstream

clients = set()

def action_register(message):
    address = message.split()[1].strip()
    if address in clients:
        return 'HOP'
    else:
        clients.add(address)
        return 'OK'

def action_list(message):
    return ' '.join(clients)

def handle(stream, message):
    addr, text = message
    print('BROKER: ' + text)
    action = text.split()[0].lower()
    try:
        reply = globals()['action_' + action](text)
    except KeyError:
        print('BROKER: Unknown action', action)
        reply = 'ERROR'
    stream.send_multipart((addr, reply))

io_loop = ioloop.IOLoop()
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
stream = zmqstream.ZMQStream(socket, io_loop=io_loop)
stream.on_recv_stream(handle)

parser = argparse.ArgumentParser()
parser.add_argument('-b', '--bind-address', default='tcp://0.0.0.0:5555')

if __name__ == '__main__':
    args = parser.parse_args()
    socket.bind(args.bind_address)
    io_loop.start()


Writing broker.py

In [8]:
%%writefile cities.txt
Barcelona
Berlin
Madrid
New York
Londres
Igualada


Writing cities.txt

In [9]:
%%writefile hider.py
#!/usr/bin/env python
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', default='5556')
parser.add_argument('-b', '--broker', default='tcp://127.0.0.1:5555')
args = parser.parse_args()


Writing hider.py

In [10]:
%%writefile seeker.py
#!/usr/bin/env python
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('-b', '--broker', default='tcp://127.0.0.1:5555')


Writing seeker.py

In [ ]: