Implementing a ZeroMQ – WebSocket gateway

In the last post we saw a simple PubSub application using ZeroMQ. Today we are going to extend that and publish all the data we get from the ZeroMQ publishers to web clients by using WebSocket.

First problem I ran into was choosing a WebSocket server to suit my needs. I had a look at EventletGeventtxWebsocket and some others, but since what I wanted was quite simple I decided to go and write my own, in order to avoid such dependencies. Communicating both worlds (ZeroMQ and WebSocket) seemed like the big problem, so I thought the best way to solve it would be to put all sockets in a polling loop running on its own thread and avoid possible multithreading issues. This works thanks to the ZeroMQ Poller, which is able to poll ZeroMQ sockets and anything which defines a fileno() method. Awesome stuff.

Since the single threaded polling experiment worked I decided to make a small standalone project (without ZeroMQ, using select.poll instead) with it for possible future stuff. You can find it here.

Back to our gateway, lets go and check the code:

producer:

# producer
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.LINGER, 0) # discard unsent messages on close
socket.connect('epgm://239.192.1.1:5000')
while True:
msg = raw_input('> ')
if msg == 'quit':
break
else:
socket.send(msg)
socket.close()

As you may have guessed, its the same simple multicast producer we saw on the previous post.

client web page:

<html>
<head>
<title>ZWS Example</title>
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js" ></script>
<script language='javascript'>
$(document).ready(function() {
var ws = new WebSocket("ws://localhost:9999/test");
ws.onmessage = function(evt) {
$('#output').append(evt.data+'<br />');
}
ws.onopen = function(evt) {
$('#conn_status').html('<b>Connected</b>');
}
ws.onerror = function(evt) {
$('#conn_status').html('<b>Error</b>');
}
ws.onclose = function(evt) {
$('#conn_status').html('<b>Closed</b>');
}
});
</script>
</head>
<body>
<h1>ZMQ - WebSocket Example</h1>
<div id="conn_status">Not Connected</div>
<div id="output"></div>
</body>
</html>
view raw index.html hosted with ❤ by GitHub

consumer and WebSocket server:

# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <saghul@gmail.com>
#
import hashlib
import os
import re
import socket
import struct
import Queue
import zmq
from collections import namedtuple
from threading import Event, Thread
READ_ONLY = zmq.POLLIN | zmq.POLLERR
READ_WRITE = READ_ONLY | zmq.POLLOUT
class WSServerSocket(object):
def __init__(self, ip, port):
self._ip = ip
self._port = port
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setblocking(0)
def bind(self):
self._socket.bind((self._ip, self._port))
self._socket.listen(5)
def accept(self):
return self._socket.accept()
def fileno(self):
return self._socket.fileno()
def close(self):
self._socket.close()
class WSClientSocket(object):
_digit_re = re.compile(r'[^0-9]')
_spaces_re = re.compile(r'\s')
_req_line_re = re.compile('^GET (?P<handler>.*) .*\\r\\n')
_handshake = (
"HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"WebSocket-Origin: %(origin)s\r\n"
"WebSocket-Location: ws://%(address)s:%(port)s%(handler)s\r\n"
"Sec-Websocket-Origin: %(origin)s\r\n"
"Sec-Websocket-Location: ws://%(address)s:%(port)s%(handler)s\r\n"
"\r\n"
)
def __init__(self, sock, handler):
self._socket = sock
self._socket.setblocking(0)
self.handler = handler
self.handshaken = False
self.write_queue = Queue.Queue()
self.headers = ''
self.data = ''
def send(self, data):
self._socket.send(data)
def recv(self, bufsize):
return self._socket.recv(bufsize)
def fileno(self):
return self._socket.fileno()
def close(self):
self._socket.close()
def _queue_send(self, data):
try:
self.write_queue.put_nowait(data)
except Queue.Full:
pass
def queue_send(self, data):
self._queue_send('\x00%s\xff' % data)
def data_received(self, data):
if not self.handshaken:
if data.startswith('GET'):
match = self._req_line_re.match(data)
if not (match and match.groupdict()['handler'] == self.handler):
self.close()
return
self.headers += data
if self.headers.find('\r\n\r\n') != -1:
parts = self.headers.split('\r\n\r\n', 1)
self.headers = parts[0]
if self.do_handshake(self.headers, parts[1]):
self.handshaken = True
else:
self.data += data
msgs = self.data.split('\xff')
self.data = msgs.pop()
for msg in msgs:
if msg[0] == '\x00':
self.message_received(msg[1:])
def do_handshake(self, header, key=None):
part_1 = part_2 = origin = None
for line in header.split('\r\n')[1:]:
name, value = line.split(': ', 1)
if name.lower() == "sec-websocket-key1":
key_number_1 = int(self._digit_re.sub('', value))
spaces_1 = len(self._spaces_re.findall(value))
if spaces_1 == 0:
return False
if key_number_1 % spaces_1 != 0:
return False
part_1 = key_number_1 / spaces_1
elif name.lower() == "sec-websocket-key2":
key_number_2 = int(self._digit_re.sub('', value))
spaces_2 = len(self._spaces_re.findall(value))
if spaces_2 == 0:
return False
if key_number_2 % spaces_2 != 0:
return False
part_2 = key_number_2 / spaces_2
elif name.lower() == "host":
host, _ = value.split(':', 1)
elif name.lower() == "origin":
origin = value
server_ip, server_port = self._socket.getsockname()
if part_1 and part_2:
challenge = struct.pack('!I', part_1) + struct.pack('!I', part_2) + key
response = hashlib.md5(challenge).digest()
handshake = self._handshake % {
'origin': origin,
'address': host,
'port': server_port,
'handler': self.handler
}
handshake += response
else:
# Not using challenge-response
handshake = self._handshake % {
'origin': origin,
'address': host,
'port': server_port,
'handler': self.handler
}
self._queue_send(handshake) # Note the _ !
return True
def message_received(self, data):
pass
class ZWSGateway(Thread):
def __init__(self, config):
self.config = config
# setup zeromq socket
self._poller = zmq.Poller()
self._context = zmq.Context()
self._socket = self._context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, '') # subscribe to anything
self._poller.register(self._socket, READ_ONLY)
# setup websocket server
self._wsserver = WSServerSocket(str(config.websocket_address), config.websocket_port)
self._poller.register(self._wsserver, READ_ONLY)
# mapping: fileno -> socket
self.fd_map = { self._socket: self._socket, # zmq sockets don't have fileno()
self._wsserver.fileno(): self._wsserver }
# create a new pipe used for thread termination
self._pipe = os.pipe()
self._poller.register(self._pipe[0], zmq.POLLIN)
self._stop_event = Event()
Thread.__init__(self)
self.daemon = True
def start(self):
if self.config.use_multicast:
self._socket.connect('epgm://%s:%d' % (self.config.listen_address, self.config.listen_port))
else:
self._socket.bind('tcp://%s:%d' % (self.config.listen_address, self.config.listen_port))
self._wsserver.bind()
Thread.start(self)
def stop(self):
self._stop_event.set()
os.write(self._pipe[1], 'stop')
Thread.join(self)
self._poller.unregister(self._pipe[0])
os.close(self._pipe[0])
os.close(self._pipe[1])
self._poller.unregister(self._socket)
self._socket.close()
self._poller.unregister(self._wsserver)
self._wsserver.close()
def run(self):
while not self._stop_event.is_set():
events = self._poller.poll()
for fd, flag in events:
if fd == self._pipe[0]:
# Stop requested
for wsclient in (wsclient for wsclient in self.fd_map.values() if wsclient not in (self._socket, self._wsserver)):
self._poller.unregister(wsclient)
wsclient.close()
self.fd_map = {}
break
socket = self.fd_map[fd]
if flag & zmq.POLLIN:
# Ready to read
if socket is self._socket:
data = self._socket.recv()
self.websocket_broadcast(data)
elif socket is self._wsserver:
sock, addr = socket.accept()
wsclient = WSClientSocket(sock, self.config.websocket_handler)
self.fd_map[wsclient.fileno()] = wsclient
self._poller.register(wsclient, READ_WRITE)
else:
# Data coming from a WebSocket
data = socket.recv(1024)
if data:
socket.data_received(data)
else:
self.fd_map.pop(fd)
self._poller.unregister(socket)
socket.close()
elif flag & zmq.POLLOUT:
# Ready to send
if isinstance(socket, WSClientSocket):
try:
data = socket.write_queue.get_nowait()
except Queue.Empty:
pass
else:
socket.send(data)
elif flag & zmq.POLLERR:
# Error
self.fd_map.pop(fd)
self._poller.unregister(socket)
socket.close()
def websocket_broadcast(self, msg):
[wsclient.queue_send(msg) for wsclient in (wsclient for wsclient in self.fd_map.values() if wsclient not in (self._socket, self._wsserver))]
Config = namedtuple('Config', ['listen_address', 'listen_port', 'use_multicast', 'websocket_address', 'websocket_port', 'websocket_handler'])
if __name__ == '__main__':
config = Config('239.192.1.1', 5000, True, '', 9999, '/test')
server = ZWSGateway(config)
server.start()
while True:
try:
c = raw_input('> ')
if c == 'quit':
break
except KeyboardInterrupt:
break
server.stop()

I had never used poll nor ZeroMQ Poller before, but they were both pretty easy to get started with. In a nutshell, the server will take every piece of data it gets from the ZeroMQ socket and broadcast it to all the connected web clients through web sockets. The gateway is one way only, the input coming from the web sockets is not sent anywhere, I’ll leave that as an experiment for the reader. :–)

To test it just run the server and serve the page. You can use this to serve the page.

:wq

2 thoughts on “Implementing a ZeroMQ – WebSocket gateway

  1. Sounds interesting, but I’d take another approach: make a simple Python daemon that connects to the manager interface (so there would be only 1 connection there) and build a gateway using ZeroMQ. I really like the concept of using JSON for that.

Leave a Reply