A ZeroMQ to WebSocket gateway, take 2

A while ago I posted a way to build a gateway between ZeroMQ and WebSocket. That mechanism required a custom WebSocket server which has to be maintained and I wasn’t really happy with it anyway so I thought I’d find another way.

Moreover, WebSocket is probably not enough. Since the WebSocket specification has changed a lot, different browsers have implemented different versions of the draft. WebSocket could even be disabled god know why, so we need a fallback mechanism. And then I discovered SocketIO. With SocketIO you can establish a persistent connection between a browser and a server in many different ways: WebSocket, flash socket, long polling, etc. This is great stuff.

If I’m not mistaken SocketIO was born for NodeJS but then was ported to some other programming languages and frameworks. The one I’m particularly interested in is TornadIO. TornadIO is a SocketIO implementation on top of Tornado. In case you have lived in a cave for the last several years ;–) you should know that Tornado is the webserver that powered FriendFeed, which was acquired by Facebook.

So, we now have a nice persistent connection between the browser (using SocketIO JavaScript library) and our server (using TornadIO), but how do we add ZeroMQ to the mix? It couldn’t be easier: ZeroMQ added a way to integrate its event loop with the one in Tornado, so there is really nothing else to be done to integrate them. :–)

Here is an example of a ZeroMQ – WebSocket (SocketIO really) gateway built as I described above. The client will connect to the server (using the web browser) through SocketIO and will just sit there waiting to get data. The server will push anything that arrives through the ZeroMQ socket to all clients connected through SocketIO and the messages will just get printed on the screen. It’s a simple example, but you get the idea ;–)

import os
import zmq
from zmq.eventloop import ioloop, zmqstream
ioloop.install()
import tornado
import tornado.web
import tornadio
import tornadio.router
import tornadio.server
ROOT = os.path.normpath(os.path.dirname(__file__))
class IndexHandler(tornado.web.RequestHandler):
def get(self):
self.render("index.html")
class ClientConnection(tornadio.SocketConnection):
clients = set()
@classmethod
def dispatch_message(cls, message):
for client in cls.clients:
client.send(message)
def on_open(self, *args, **kwargs):
self.clients.add(self)
def on_message(self, message):
pass
def on_close(self):
self.clients.remove(self)
WebClientRouter = tornadio.get_router(ClientConnection)
application = tornado.web.Application(
[(r"/", IndexHandler), WebClientRouter.route()],
enabled_protocols = ['websocket', 'flashsocket', 'xhr-multipart', 'xhr-polling'],
flash_policy_port = 843,
flash_policy_file = os.path.join(ROOT, 'flashpolicy.xml'),
socket_io_port = 8001
)
if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.DEBUG)
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind('tcp://127.0.0.1:5000')
socket.setsockopt(zmq.SUBSCRIBE, '')
stream = zmqstream.ZMQStream(socket, tornado.ioloop.IOLoop.instance())
stream.on_recv(ClientConnection.dispatch_message)
tornadio.server.SocketServer(application)
view raw consumer_tornadio.py hosted with ❤ by GitHub
<?xml version="1.0"?>
<!DOCTYPE cross-domain-policy SYSTEM "/xml/dtds/cross-domain-policy.dtd">
<cross-domain-policy>
<allow-access-from domain="*" to-ports="*" />
</cross-domain-policy>
view raw flashpolicy.xml hosted with ❤ by GitHub
<!DOCTYPE html>
<html>
<head>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js"></script>
<script src="http://cdn.socket.io/stable/socket.io.js"></script>
<script>WEB_SOCKET_SWF_LOCATION = 'http://cdn.socket.io/stable/WebSocketMain.swf';</script>
<script>
window.onload = function() {
var s = new io.Socket(window.location.hostname, {port: 8001, rememberTransport: false});
s.connect();
s.addEvent('connect', function() {
$("#chat").append("<div>Connected!</div>");
});
s.addEvent('message', function(data) {
$("#chat").append("<div>" + data + "</div>");
});
};
</script>
</head>
<body>
<h3>0MQ - WebSocket test</h3>
<div id="chat" style="width: 60em; height: 20em; overflow:auto; border: 1px solid black">
</div>
</body>
</html>
view raw index.html hosted with ❤ by GitHub
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.LINGER, 0) # discard unsent messages on close
socket.connect('tcp://127.0.0.1:5000')
while True:
msg = raw_input('> ')
if msg == 'quit':
break
else:
socket.send(msg)
socket.close()
view raw producer.py hosted with ❤ by GitHub

:wq

Implementing a ZeroMQ – WebSocket gateway

In the last post we saw a simple PubSub application using ZeroMQ. Today we are going to extend that and publish all the data we get from the ZeroMQ publishers to web clients by using WebSocket.

First problem I ran into was choosing a WebSocket server to suit my needs. I had a look at EventletGeventtxWebsocket and some others, but since what I wanted was quite simple I decided to go and write my own, in order to avoid such dependencies. Communicating both worlds (ZeroMQ and WebSocket) seemed like the big problem, so I thought the best way to solve it would be to put all sockets in a polling loop running on its own thread and avoid possible multithreading issues. This works thanks to the ZeroMQ Poller, which is able to poll ZeroMQ sockets and anything which defines a fileno() method. Awesome stuff.

Since the single threaded polling experiment worked I decided to make a small standalone project (without ZeroMQ, using select.poll instead) with it for possible future stuff. You can find it here.

Back to our gateway, lets go and check the code:

producer:

# producer
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.LINGER, 0) # discard unsent messages on close
socket.connect('epgm://239.192.1.1:5000')
while True:
msg = raw_input('> ')
if msg == 'quit':
break
else:
socket.send(msg)
socket.close()

As you may have guessed, its the same simple multicast producer we saw on the previous post.

client web page:

<html>
<head>
<title>ZWS Example</title>
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js" ></script>
<script language='javascript'>
$(document).ready(function() {
var ws = new WebSocket("ws://localhost:9999/test");
ws.onmessage = function(evt) {
$('#output').append(evt.data+'<br />');
}
ws.onopen = function(evt) {
$('#conn_status').html('<b>Connected</b>');
}
ws.onerror = function(evt) {
$('#conn_status').html('<b>Error</b>');
}
ws.onclose = function(evt) {
$('#conn_status').html('<b>Closed</b>');
}
});
</script>
</head>
<body>
<h1>ZMQ - WebSocket Example</h1>
<div id="conn_status">Not Connected</div>
<div id="output"></div>
</body>
</html>
view raw index.html hosted with ❤ by GitHub

consumer and WebSocket server:

# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <saghul@gmail.com>
#
import hashlib
import os
import re
import socket
import struct
import Queue
import zmq
from collections import namedtuple
from threading import Event, Thread
READ_ONLY = zmq.POLLIN | zmq.POLLERR
READ_WRITE = READ_ONLY | zmq.POLLOUT
class WSServerSocket(object):
def __init__(self, ip, port):
self._ip = ip
self._port = port
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setblocking(0)
def bind(self):
self._socket.bind((self._ip, self._port))
self._socket.listen(5)
def accept(self):
return self._socket.accept()
def fileno(self):
return self._socket.fileno()
def close(self):
self._socket.close()
class WSClientSocket(object):
_digit_re = re.compile(r'[^0-9]')
_spaces_re = re.compile(r'\s')
_req_line_re = re.compile('^GET (?P<handler>.*) .*\\r\\n')
_handshake = (
"HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"WebSocket-Origin: %(origin)s\r\n"
"WebSocket-Location: ws://%(address)s:%(port)s%(handler)s\r\n"
"Sec-Websocket-Origin: %(origin)s\r\n"
"Sec-Websocket-Location: ws://%(address)s:%(port)s%(handler)s\r\n"
"\r\n"
)
def __init__(self, sock, handler):
self._socket = sock
self._socket.setblocking(0)
self.handler = handler
self.handshaken = False
self.write_queue = Queue.Queue()
self.headers = ''
self.data = ''
def send(self, data):
self._socket.send(data)
def recv(self, bufsize):
return self._socket.recv(bufsize)
def fileno(self):
return self._socket.fileno()
def close(self):
self._socket.close()
def _queue_send(self, data):
try:
self.write_queue.put_nowait(data)
except Queue.Full:
pass
def queue_send(self, data):
self._queue_send('\x00%s\xff' % data)
def data_received(self, data):
if not self.handshaken:
if data.startswith('GET'):
match = self._req_line_re.match(data)
if not (match and match.groupdict()['handler'] == self.handler):
self.close()
return
self.headers += data
if self.headers.find('\r\n\r\n') != -1:
parts = self.headers.split('\r\n\r\n', 1)
self.headers = parts[0]
if self.do_handshake(self.headers, parts[1]):
self.handshaken = True
else:
self.data += data
msgs = self.data.split('\xff')
self.data = msgs.pop()
for msg in msgs:
if msg[0] == '\x00':
self.message_received(msg[1:])
def do_handshake(self, header, key=None):
part_1 = part_2 = origin = None
for line in header.split('\r\n')[1:]:
name, value = line.split(': ', 1)
if name.lower() == "sec-websocket-key1":
key_number_1 = int(self._digit_re.sub('', value))
spaces_1 = len(self._spaces_re.findall(value))
if spaces_1 == 0:
return False
if key_number_1 % spaces_1 != 0:
return False
part_1 = key_number_1 / spaces_1
elif name.lower() == "sec-websocket-key2":
key_number_2 = int(self._digit_re.sub('', value))
spaces_2 = len(self._spaces_re.findall(value))
if spaces_2 == 0:
return False
if key_number_2 % spaces_2 != 0:
return False
part_2 = key_number_2 / spaces_2
elif name.lower() == "host":
host, _ = value.split(':', 1)
elif name.lower() == "origin":
origin = value
server_ip, server_port = self._socket.getsockname()
if part_1 and part_2:
challenge = struct.pack('!I', part_1) + struct.pack('!I', part_2) + key
response = hashlib.md5(challenge).digest()
handshake = self._handshake % {
'origin': origin,
'address': host,
'port': server_port,
'handler': self.handler
}
handshake += response
else:
# Not using challenge-response
handshake = self._handshake % {
'origin': origin,
'address': host,
'port': server_port,
'handler': self.handler
}
self._queue_send(handshake) # Note the _ !
return True
def message_received(self, data):
pass
class ZWSGateway(Thread):
def __init__(self, config):
self.config = config
# setup zeromq socket
self._poller = zmq.Poller()
self._context = zmq.Context()
self._socket = self._context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, '') # subscribe to anything
self._poller.register(self._socket, READ_ONLY)
# setup websocket server
self._wsserver = WSServerSocket(str(config.websocket_address), config.websocket_port)
self._poller.register(self._wsserver, READ_ONLY)
# mapping: fileno -> socket
self.fd_map = { self._socket: self._socket, # zmq sockets don't have fileno()
self._wsserver.fileno(): self._wsserver }
# create a new pipe used for thread termination
self._pipe = os.pipe()
self._poller.register(self._pipe[0], zmq.POLLIN)
self._stop_event = Event()
Thread.__init__(self)
self.daemon = True
def start(self):
if self.config.use_multicast:
self._socket.connect('epgm://%s:%d' % (self.config.listen_address, self.config.listen_port))
else:
self._socket.bind('tcp://%s:%d' % (self.config.listen_address, self.config.listen_port))
self._wsserver.bind()
Thread.start(self)
def stop(self):
self._stop_event.set()
os.write(self._pipe[1], 'stop')
Thread.join(self)
self._poller.unregister(self._pipe[0])
os.close(self._pipe[0])
os.close(self._pipe[1])
self._poller.unregister(self._socket)
self._socket.close()
self._poller.unregister(self._wsserver)
self._wsserver.close()
def run(self):
while not self._stop_event.is_set():
events = self._poller.poll()
for fd, flag in events:
if fd == self._pipe[0]:
# Stop requested
for wsclient in (wsclient for wsclient in self.fd_map.values() if wsclient not in (self._socket, self._wsserver)):
self._poller.unregister(wsclient)
wsclient.close()
self.fd_map = {}
break
socket = self.fd_map[fd]
if flag & zmq.POLLIN:
# Ready to read
if socket is self._socket:
data = self._socket.recv()
self.websocket_broadcast(data)
elif socket is self._wsserver:
sock, addr = socket.accept()
wsclient = WSClientSocket(sock, self.config.websocket_handler)
self.fd_map[wsclient.fileno()] = wsclient
self._poller.register(wsclient, READ_WRITE)
else:
# Data coming from a WebSocket
data = socket.recv(1024)
if data:
socket.data_received(data)
else:
self.fd_map.pop(fd)
self._poller.unregister(socket)
socket.close()
elif flag & zmq.POLLOUT:
# Ready to send
if isinstance(socket, WSClientSocket):
try:
data = socket.write_queue.get_nowait()
except Queue.Empty:
pass
else:
socket.send(data)
elif flag & zmq.POLLERR:
# Error
self.fd_map.pop(fd)
self._poller.unregister(socket)
socket.close()
def websocket_broadcast(self, msg):
[wsclient.queue_send(msg) for wsclient in (wsclient for wsclient in self.fd_map.values() if wsclient not in (self._socket, self._wsserver))]
Config = namedtuple('Config', ['listen_address', 'listen_port', 'use_multicast', 'websocket_address', 'websocket_port', 'websocket_handler'])
if __name__ == '__main__':
config = Config('239.192.1.1', 5000, True, '', 9999, '/test')
server = ZWSGateway(config)
server.start()
while True:
try:
c = raw_input('> ')
if c == 'quit':
break
except KeyboardInterrupt:
break
server.stop()

I had never used poll nor ZeroMQ Poller before, but they were both pretty easy to get started with. In a nutshell, the server will take every piece of data it gets from the ZeroMQ socket and broadcast it to all the connected web clients through web sockets. The gateway is one way only, the input coming from the web sockets is not sent anywhere, I’ll leave that as an experiment for the reader. :–)

To test it just run the server and serve the page. You can use this to serve the page.

:wq