Implementing a ZeroMQ – WebSocket gateway

In the last post we saw a simple PubSub application using ZeroMQ. Today we are going to extend that and publish all the data we get from the ZeroMQ publishers to web clients by using WebSocket.

First problem I ran into was choosing a WebSocket server to suit my needs. I had a look at EventletGeventtxWebsocket and some others, but since what I wanted was quite simple I decided to go and write my own, in order to avoid such dependencies. Communicating both worlds (ZeroMQ and WebSocket) seemed like the big problem, so I thought the best way to solve it would be to put all sockets in a polling loop running on its own thread and avoid possible multithreading issues. This works thanks to the ZeroMQ Poller, which is able to poll ZeroMQ sockets and anything which defines a fileno() method. Awesome stuff.

Since the single threaded polling experiment worked I decided to make a small standalone project (without ZeroMQ, using select.poll instead) with it for possible future stuff. You can find it here.

Back to our gateway, lets go and check the code:

producer:

# producer
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.LINGER, 0) # discard unsent messages on close
socket.connect('epgm://239.192.1.1:5000')
while True:
msg = raw_input('> ')
if msg == 'quit':
break
else:
socket.send(msg)
socket.close()

As you may have guessed, its the same simple multicast producer we saw on the previous post.

client web page:

<html>
<head>
<title>ZWS Example</title>
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js" ></script>
<script language='javascript'>
$(document).ready(function() {
var ws = new WebSocket("ws://localhost:9999/test");
ws.onmessage = function(evt) {
$('#output').append(evt.data+'<br />');
}
ws.onopen = function(evt) {
$('#conn_status').html('<b>Connected</b>');
}
ws.onerror = function(evt) {
$('#conn_status').html('<b>Error</b>');
}
ws.onclose = function(evt) {
$('#conn_status').html('<b>Closed</b>');
}
});
</script>
</head>
<body>
<h1>ZMQ - WebSocket Example</h1>
<div id="conn_status">Not Connected</div>
<div id="output"></div>
</body>
</html>
view raw index.html hosted with ❤ by GitHub

consumer and WebSocket server:

# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <saghul@gmail.com>
#
import hashlib
import os
import re
import socket
import struct
import Queue
import zmq
from collections import namedtuple
from threading import Event, Thread
READ_ONLY = zmq.POLLIN | zmq.POLLERR
READ_WRITE = READ_ONLY | zmq.POLLOUT
class WSServerSocket(object):
def __init__(self, ip, port):
self._ip = ip
self._port = port
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setblocking(0)
def bind(self):
self._socket.bind((self._ip, self._port))
self._socket.listen(5)
def accept(self):
return self._socket.accept()
def fileno(self):
return self._socket.fileno()
def close(self):
self._socket.close()
class WSClientSocket(object):
_digit_re = re.compile(r'[^0-9]')
_spaces_re = re.compile(r'\s')
_req_line_re = re.compile('^GET (?P<handler>.*) .*\\r\\n')
_handshake = (
"HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"WebSocket-Origin: %(origin)s\r\n"
"WebSocket-Location: ws://%(address)s:%(port)s%(handler)s\r\n"
"Sec-Websocket-Origin: %(origin)s\r\n"
"Sec-Websocket-Location: ws://%(address)s:%(port)s%(handler)s\r\n"
"\r\n"
)
def __init__(self, sock, handler):
self._socket = sock
self._socket.setblocking(0)
self.handler = handler
self.handshaken = False
self.write_queue = Queue.Queue()
self.headers = ''
self.data = ''
def send(self, data):
self._socket.send(data)
def recv(self, bufsize):
return self._socket.recv(bufsize)
def fileno(self):
return self._socket.fileno()
def close(self):
self._socket.close()
def _queue_send(self, data):
try:
self.write_queue.put_nowait(data)
except Queue.Full:
pass
def queue_send(self, data):
self._queue_send('\x00%s\xff' % data)
def data_received(self, data):
if not self.handshaken:
if data.startswith('GET'):
match = self._req_line_re.match(data)
if not (match and match.groupdict()['handler'] == self.handler):
self.close()
return
self.headers += data
if self.headers.find('\r\n\r\n') != -1:
parts = self.headers.split('\r\n\r\n', 1)
self.headers = parts[0]
if self.do_handshake(self.headers, parts[1]):
self.handshaken = True
else:
self.data += data
msgs = self.data.split('\xff')
self.data = msgs.pop()
for msg in msgs:
if msg[0] == '\x00':
self.message_received(msg[1:])
def do_handshake(self, header, key=None):
part_1 = part_2 = origin = None
for line in header.split('\r\n')[1:]:
name, value = line.split(': ', 1)
if name.lower() == "sec-websocket-key1":
key_number_1 = int(self._digit_re.sub('', value))
spaces_1 = len(self._spaces_re.findall(value))
if spaces_1 == 0:
return False
if key_number_1 % spaces_1 != 0:
return False
part_1 = key_number_1 / spaces_1
elif name.lower() == "sec-websocket-key2":
key_number_2 = int(self._digit_re.sub('', value))
spaces_2 = len(self._spaces_re.findall(value))
if spaces_2 == 0:
return False
if key_number_2 % spaces_2 != 0:
return False
part_2 = key_number_2 / spaces_2
elif name.lower() == "host":
host, _ = value.split(':', 1)
elif name.lower() == "origin":
origin = value
server_ip, server_port = self._socket.getsockname()
if part_1 and part_2:
challenge = struct.pack('!I', part_1) + struct.pack('!I', part_2) + key
response = hashlib.md5(challenge).digest()
handshake = self._handshake % {
'origin': origin,
'address': host,
'port': server_port,
'handler': self.handler
}
handshake += response
else:
# Not using challenge-response
handshake = self._handshake % {
'origin': origin,
'address': host,
'port': server_port,
'handler': self.handler
}
self._queue_send(handshake) # Note the _ !
return True
def message_received(self, data):
pass
class ZWSGateway(Thread):
def __init__(self, config):
self.config = config
# setup zeromq socket
self._poller = zmq.Poller()
self._context = zmq.Context()
self._socket = self._context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, '') # subscribe to anything
self._poller.register(self._socket, READ_ONLY)
# setup websocket server
self._wsserver = WSServerSocket(str(config.websocket_address), config.websocket_port)
self._poller.register(self._wsserver, READ_ONLY)
# mapping: fileno -> socket
self.fd_map = { self._socket: self._socket, # zmq sockets don't have fileno()
self._wsserver.fileno(): self._wsserver }
# create a new pipe used for thread termination
self._pipe = os.pipe()
self._poller.register(self._pipe[0], zmq.POLLIN)
self._stop_event = Event()
Thread.__init__(self)
self.daemon = True
def start(self):
if self.config.use_multicast:
self._socket.connect('epgm://%s:%d' % (self.config.listen_address, self.config.listen_port))
else:
self._socket.bind('tcp://%s:%d' % (self.config.listen_address, self.config.listen_port))
self._wsserver.bind()
Thread.start(self)
def stop(self):
self._stop_event.set()
os.write(self._pipe[1], 'stop')
Thread.join(self)
self._poller.unregister(self._pipe[0])
os.close(self._pipe[0])
os.close(self._pipe[1])
self._poller.unregister(self._socket)
self._socket.close()
self._poller.unregister(self._wsserver)
self._wsserver.close()
def run(self):
while not self._stop_event.is_set():
events = self._poller.poll()
for fd, flag in events:
if fd == self._pipe[0]:
# Stop requested
for wsclient in (wsclient for wsclient in self.fd_map.values() if wsclient not in (self._socket, self._wsserver)):
self._poller.unregister(wsclient)
wsclient.close()
self.fd_map = {}
break
socket = self.fd_map[fd]
if flag & zmq.POLLIN:
# Ready to read
if socket is self._socket:
data = self._socket.recv()
self.websocket_broadcast(data)
elif socket is self._wsserver:
sock, addr = socket.accept()
wsclient = WSClientSocket(sock, self.config.websocket_handler)
self.fd_map[wsclient.fileno()] = wsclient
self._poller.register(wsclient, READ_WRITE)
else:
# Data coming from a WebSocket
data = socket.recv(1024)
if data:
socket.data_received(data)
else:
self.fd_map.pop(fd)
self._poller.unregister(socket)
socket.close()
elif flag & zmq.POLLOUT:
# Ready to send
if isinstance(socket, WSClientSocket):
try:
data = socket.write_queue.get_nowait()
except Queue.Empty:
pass
else:
socket.send(data)
elif flag & zmq.POLLERR:
# Error
self.fd_map.pop(fd)
self._poller.unregister(socket)
socket.close()
def websocket_broadcast(self, msg):
[wsclient.queue_send(msg) for wsclient in (wsclient for wsclient in self.fd_map.values() if wsclient not in (self._socket, self._wsserver))]
Config = namedtuple('Config', ['listen_address', 'listen_port', 'use_multicast', 'websocket_address', 'websocket_port', 'websocket_handler'])
if __name__ == '__main__':
config = Config('239.192.1.1', 5000, True, '', 9999, '/test')
server = ZWSGateway(config)
server.start()
while True:
try:
c = raw_input('> ')
if c == 'quit':
break
except KeyboardInterrupt:
break
server.stop()

I had never used poll nor ZeroMQ Poller before, but they were both pretty easy to get started with. In a nutshell, the server will take every piece of data it gets from the ZeroMQ socket and broadcast it to all the connected web clients through web sockets. The gateway is one way only, the input coming from the web sockets is not sent anywhere, I’ll leave that as an experiment for the reader. :–)

To test it just run the server and serve the page. You can use this to serve the page.

:wq

Implementing a PubSub based application with Python and ZeroMQ

A while back I came across this awesome post by Nicholas Piël about ZeroMQ and I added test ZeroMQ to my never ending ToCheck list. Today I found the excuse to check it out and here is what I did with it.

I wanted to implement a producer-consumer application to aggregate data contained in an arbitrary number of files in an arbitrary number of nodes. ZeroMQ seemed like a good tool for this, so lets see a helloworld example: producer:

# producer
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.LINGER, 0) # discard unsent messages on close
socket.connect('tcp://127.0.0.1:5000')
while True:
msg = raw_input('> ')
if msg == 'quit':
break
else:
socket.send(msg)
socket.close()
view raw producer.py hosted with ❤ by GitHub

consumer:

# consumer
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind('tcp://127.0.0.1:5000')
socket.setsockopt(zmq.SUBSCRIBE, 'test')
socket.setsockopt(zmq.SUBSCRIBE, 'topic_1')
while True:
data = socket.recv()
print data
view raw consumer.py hosted with ❤ by GitHub

As you can see the code is very straightforward. We create a socket, declare it as a publisher (PUB) or subscriber (SUB) accordingly and subscribe to the topics which we care about (in the case of the subscriber).

Looks like we are writing or reading to/from a standard socket, but this socket is on steroids. You can write to it even if the other end is not connected, all the queuing logic has been taken care for you, it accepts multiple connections, … and many many other things.

UPDATE: After re-reading the docs it seems that ZeroMQ sockets are not really thread-safe unless you pass them using a “full memory barrier”. I guess the fact that Python has the GIL makes my example not crash :–)

ZeroMQ is also thread-safe, and this makes our code a lot simpler than it would if ZeroMQ weren’t thread safe. Lets see an example with 10 threads simultaneously sending messages through a ZeroMQ socket without any locking mechanism.

# producer
import os
import random
import threading
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.LINGER, 0) # discard unsent messages on close
socket.connect('tcp://127.0.0.1:5000')
stop_event = threading.Event()
def run():
while not stop_event.is_set():
msg = 'test Hello from process %s and thread %s' % (os.getpid(), threading.current_thread())
socket.send(msg)
time.sleep(random.randint(1, 5))
threads = []
for i in xrange(1, 11):
t = threading.Thread(target=run)
t.start()
threads.append(t)
while True:
msg = raw_input('> ')
if msg == 'quit':
break
stop_event.set()
[t.join() for t in threads]
socket.close()

The approach we took is very flexible in the sense that any number of producers may connect to our consumer, but they need to know where (in terms of IP and port) our consumer is or will be. Could we change that somehow?

Yes we can. We can use multicast for this. We can make all producers join a multicast group and send their messages there. Then the consumer would also join the same group and get them. Of course this only makes sense in a controlled environment where data is not critical or no non-authorized party is able to join the multicast group.

The funny thing is that we only need to change one line of code in the producer and another one in the consumer in order to make them work in multicast mode.

producer:

# producer
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.LINGER, 0) # discard unsent messages on close
socket.connect('epgm://239.192.1.1:5000')
while True:
msg = raw_input('> ')
if msg == 'quit':
break
else:
socket.send(msg)
socket.close()

consumer:

# consumer
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('epgm://239.192.1.1:5000')
socket.setsockopt(zmq.SUBSCRIBE, 'test')
socket.setsockopt(zmq.SUBSCRIBE, 'topic_1')
while True:
data = socket.recv()
print data

I’ve started a small project I’ll be working on in my free time, hope to post something about it soon :–)

:wq