Implementing a PubSub based application with Python and ZeroMQ

A while back I came across this awesome post by Nicholas Piël about ZeroMQ and I added test ZeroMQ to my never ending ToCheck list. Today I found the excuse to check it out and here is what I did with it.

I wanted to implement a producer-consumer application to aggregate data contained in an arbitrary number of files in an arbitrary number of nodes. ZeroMQ seemed like a good tool for this, so lets see a helloworld example: producer:

consumer:

As you can see the code is very straightforward. We create a socket, declare it as a publisher (PUB) or subscriber (SUB) accordingly and subscribe to the topics which we care about (in the case of the subscriber).

Looks like we are writing or reading to/from a standard socket, but this socket is on steroids. You can write to it even if the other end is not connected, all the queuing logic has been taken care for you, it accepts multiple connections, … and many many other things.

UPDATE: After re-reading the docs it seems that ZeroMQ sockets are not really thread-safe unless you pass them using a “full memory barrier”. I guess the fact that Python has the GIL makes my example not crash :–)

ZeroMQ is also thread-safe, and this makes our code a lot simpler than it would if ZeroMQ weren’t thread safe. Lets see an example with 10 threads simultaneously sending messages through a ZeroMQ socket without any locking mechanism.

The approach we took is very flexible in the sense that any number of producers may connect to our consumer, but they need to know where (in terms of IP and port) our consumer is or will be. Could we change that somehow?

Yes we can. We can use multicast for this. We can make all producers join a multicast group and send their messages there. Then the consumer would also join the same group and get them. Of course this only makes sense in a controlled environment where data is not critical or no non-authorized party is able to join the multicast group.

The funny thing is that we only need to change one line of code in the producer and another one in the consumer in order to make them work in multicast mode.

producer:

consumer:

I’ve started a small project I’ll be working on in my free time, hope to post something about it soon :–)

:wq

  • CFK

    I’m using PyZMQ 2.2.0, which may have changed the addressing slightly, but I found that the address ‘epgm://239.192.1.1:5000’ wouldn’t work for me. Instead, I had to use a form similar to ‘epgm://192.168.1.10;239.192.1.1:5000’. The first address is the unicast address associated with the interface I want to communicate over. This is specified in http://api.zeromq.org/2-2:zmq-pgm#toc3 and is shown in the example at http://api.zeromq.org/2-2:zmq-pgm#toc7 . Other than that, thanks for posting the code, I was really scratching my head over how to do this right!