Serving a WSGI app, WebSockets and static files with Twisted

Long time no post! Lets solve that now shall we?

A few days ago I started playing a bit with Flask, since I’m considering it as the framework to build some API server. I have no web development experience, and Flask looks like a great project so I went with that.

I started with a tiny little hello world, and then I wanted to add some websockets and some CSS. Oh the trouble. When I started looking for how to combine a Flask app with WebSockets I found references to gevent-socketio for the most part, but I somewhat wanted to use Twisted this time, so I kept looking. Soon enough I found AutoBahn, a great WebSocket implementation for Twisted, which can be combined with a WSGI app, brilliant! After seeing how AutoBahn manages to add the websocket route to the WSGI app, adding support for static files was kind of trivial.

Here is the result of my experiments, a really simple web app which consists of a Flask WSGI app, a WebSocket server and some static files, all served by the same process running Twisted. You may not want to do this in a production environment, but hey, I’m just playing here 🙂

import os
from autobahn.resource import WebSocketResource, WSGIRootResource
from autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocol
from flask import Flask, render_template
from twisted.application import internet, service
from twisted.internet import reactor
from twisted.python.threadpool import ThreadPool
from twisted.web.server import Site
from twisted.web.static import File
from twisted.web.wsgi import WSGIResource
import settings
class EchoServerProtocol(WebSocketServerProtocol):
def onMessage(self, msg, binary):
self.sendMessage(msg, binary)
app = Flask('Echo Test')
def index():
return render_template('index.html', ws_port=settings.PORT)
# create a Twisted Web resource for our WebSocket server
ws_factory = WebSocketServerFactory("ws://%s:%d" % (settings.INTERFACE, settings.PORT))
ws_factory.protocol = EchoServerProtocol
ws_resource = WebSocketResource(ws_factory)
# create thread pool used to serve WSGI requests
thread_pool = ThreadPool(maxthreads=settings.THREAD_POOL_SIZE)
reactor.addSystemEventTrigger('before', 'shutdown', thread_pool.stop)
# create a Twisted Web WSGI resource for our Flask server
wsgi_resource = WSGIResource(reactor, thread_pool, app)
# create resource for static assets
static_resource = File(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates', 'assets'))
# create a root resource serving everything via WSGI/Flask, but
# the path "/assets" served by our File stuff and
# the path "/ws" served by our WebSocket stuff
root_resource = WSGIRootResource(wsgi_resource, {'assets': static_resource, 'ws': ws_resource})
# create a Twisted Web Site
site = Site(root_resource)
# setup an application for serving the site
web_service = internet.TCPServer(settings.PORT, site, interface=settings.INTERFACE)
application = service.Application("Echo Test")
<link rel="stylesheet" type="text/css" href="/assets/style.css">
<script type="text/javascript">
var sock = null;
var ellog = null;
window.onload = function() {
ellog = document.getElementById('log');
var wsuri = "ws://" + window.location.hostname + ":{{ ws_port }}/ws";
if ("WebSocket" in window) {
sock = new WebSocket(wsuri);
} else {
log("Browser does not support WebSocket!");
window.location = "";
if (sock) {
sock.onopen = function() {
log("Connected to " + wsuri);
sock.onclose = function(e) {
log("Connection closed (wasClean = " + e.wasClean + ", code = " + e.code + ", reason = '" + e.reason + "')");
sock = null;
sock.onmessage = function(e) {
log("Got echo: " +;
function send() {
var msg = document.getElementById('message').value;
if (sock) {
log("Sent: " + msg);
} else {
log("Not connected.");
function log(m) {
ellog.innerHTML += m + '\n';
ellog.scrollTop = ellog.scrollHeight;
<h1>Echo Test</h1>
<noscript>You must enable JavaScript</noscript>
<p>Message: <input id="message" type="text" size="50" maxlength="50" value="Hello, world!"></p>
<button onclick='send();'>Send Message</button>
<pre id="log" style="height: 20em; overflow-y: scroll;"></pre>
# Make sure this file is valid Python code
PORT = 8081
body {
font-family: Helvetica,Arial,sans-serif;
Since Gist does not currently allow folders, make sure you keep this layout after downloading the files:

└── templates
    ├── assets
    │   └── style.css
    └── index.html

We’ll use the twistd command line tool to launch out application, since it can take care of logging, running as a daemon, etc. To run it in the foreground:

twistd -n -l - -y

This will launch the application in non-daemon mode and log to standard output.

Hope this helps someone, all feedback is more than welcome 🙂


Evergreen 0.0.4 released!

It’s been a while since I haven’t posted around here! I made a few evergreen releases which are probably worth mentioning. They are pretty minor, no big changes have happened. The module which got most of the work is the io module, which I expect to improve more, as well as add cooperative UDP, TLS and file I/O support.

In addition, I created a couple of packages extending evergren’s functionality:

If you are using evergeen, let me know! Hopefully I can continue to make it better bit by bit.



Evergreen: cooperative multitasking and i/o for Python

I’ve been working on-and-off on this project for almost a year during my free time, and after meditating about it I thought: “fuck it, ship it”. Allow me to introduce Evergreen: cooperative multitasking and i/o for Python.

“So, another framework?” I hear you say. Yes, it’s another async framework. But it’s my async framework. I’ve used a number of frameworks for developing servers in Python such as Twisted, Tornado, Eventlet, Gevent and lately Tulip and all of them have great and not so great things, so I decided to blend the ideas I gathered from all of them, add some opinionated decisions, some Stackless flavour and Evergreen was the result.


Evergreeen is a framework which allows developers to write synchronous looking code which is executed asynchronously in a cooperative manner. Evergreen presents an API which looks like the one you would use to write concurrent programs using threads or futures from the Python standard library. The facilities provided by Evergreen are however cooperative, that is, while a task is busy waiting for some i/o other tasks will have their chance to run.

“Show me the code!” I hear you say. Sure, it’s up here on GitHub, released under the MIT license. Since the usual example is a web crawler, here you have one.

Did I mention it supports Python 2 and 3?

“Is it production ready?” I hear you say. It’s still on a very early stage, but I believe the foundation is solid. However, the APIs provided by Evergreen may change a bit until I feel confortable with them. All feedback is welcome, so if you give it a try do let me know!

I’d like to thank all authors of similar libraries for releasing their work as Open Source which I could look into and learn from.

I hope Evergreen can help you solve some problems and you enjoy using it as much as I do developing it.



pyuv 0.10.0 relased!

Today I’m happy to announce that pyuv 0.10.0 has been released! Following libuv’s versioning, this is a stable release, that is, no API changes will occur during the 0.10.x branch cycle.

It has been a while since the last stable release, there have been many changes, even though not all of them are directly visible in the public API. Here is a short list of the most relevant changes for version 0.10.0 since the 0.8 series:

  • Added a true signal watcher
  • Added ability to handle uncaught exceptions (Loop.excepthook)
  • Added and methods
  • Added support for compilation with Visual Studio in Windows
  • Added thread module with several thread synchronization primitives
  • Added mode parameter to (default, once or nowait)
  • Added fileno and get_timeout methods to Loop
  • Added ability to cancel threadpool, getaddrinfo and fs requests
  • Added ability to stop the event loop (Loop.stop)
  • Moved getaddrinfo to util module
  • Removed builtin c-ares resolver
  • Removed get/set process title functions
  • Fixed numerous refcounting issues
  • Multiple fixes for Windows
  • Multiple memory related internal optimizations

There are many more changes, all listed in the changelog file.

I’m glad to say that pyuv is now in better shape than it ever has been. Not only because I have learned many things along the way, but also because I got really good pull requests and help which enhanced pyuv in many different ways. I’m not a Windows guy and got invaluable help from people which helped make pyuv work properly on Windows. Releases 0.9.5-6 contain more commits from others than from myself, and I love that!

Last, I’d like to thank the libuv core team, more specifically Ben and Bert. They do a great job both coding libuv itself and helping others get involved in the project. This is one of the projects I’m really happy I contribute to. Oh, I also scored 4th in the libuv contributions (in lines of code) for Node 0.10.0!

You can get the source code at the usual place, and check the updated documentation here.

Rose: a PEP-3156 compatible event loop based on pyuv

For those who don’t know, PEP 3156 is a proposal for asynchronous I/O in Python, starting with Python 3.3. Until now each framework (Twisted, Tornado, …) has defined it’s own interface for defining protocols and transports. This makes very difficult if not impossible to reuse a protocol implementation across frameworks. PEP 3156 tries to fix that, among other things.

The reference implementation is called Tulip and can be found here. It’s a fast moving target, but it already contains working event loops for Windows and Unix systems. It uses pollers available in the select module for the Unix side, and a C module wrapping Windows IOCP functionality for Windows.

I was really excited to see this come through, so I started playing with it by implementing a pyuv based event loop. I called that it rose. It was a lot easier to implement than expected and it currently passes the entire test suite 🙂

Code can be found on GitHub.

Here is a quick example, the usual echo server, using rose and tulip:

import signal
from rose import EventLoopPolicy
from tulip import events, protocols
class EchoProtocol(protocols.Protocol):
def connection_made(self, transport):
# TODO: Transport should probably expose getsockname/getpeername
print("Client connected: {}".format(transport._sock.getpeername()))
self.transport = transport
def data_received(self, data):
def eof_received(self):
def connection_lost(self, exc):
print("Client closed connection")
reactor = events.get_event_loop()
f = reactor.start_serving(EchoProtocol, '', 1234)
server_socket = reactor.run_until_complete(f)
print("Serving on {}".format(server_socket.getsockname()))
reactor.add_signal_handler(signal.SIGINT, reactor.stop)
Come and join the discussion in the python-ideas mailing list!


How do event loops work in Python?

I had the pleasure to give a presentation at the first ever Python Devroom at FOSDEM. I talked about how event loops work internally and how pyuv can help by abstracting a lot of the problems with a pretty simple to use API. I also introduced rose, a pyuv based PEP-3156 event loop implementation, but I’ll write a followup post on that 🙂

Thanks a lot to everyone who attended the talk, and for those who couldn’t here are the slides!


TLS connections with pyuv and pyOpenSSL

Those of you who have been following the pyuv and/or libuv libraries may have run into this at some point: “how do I use TLS with this”? pyuv provides something similar to a socket with a completion style interface, but it only does TCP. There is also the Poll handle, which can be used to use a regular Python socket with pyuv.

Of course, this second approach is the quickest/easiest in order to get TLS working, because the Python sockets already have TLS support thanks to the ssl module. I wanted to experiment with adding some sort of TLS handle, in the same fashion as the TCP handle, that is, not with regular Python sockets.

There are 2 main libraries providing TLS support (in general): OpenSSL and GnuTLS. What I basically wanted to do was encrypt/decrypt the data in memory and read/write it to a pyuv TCP handle. OpenSSL has this functionality through the BIO API, but I didn’t see anything similar in GnuTLS at a first glance so I went with OpenSSL.

I created a quick TLS handle with the ideas expressed above, it can be found in this gist.

It contains the TLS handle, example echo server and client and some sample certificates. Here is the client implementation sample, for the rest check the full gist.

from __future__ import print_function
import OpenSSL
import pyuv
import uvtls
import signal
import sys
def shutdown_cb(handle, error):
def read_cb(handle, data, error):
if error is not None:
print("Read error: {}".format(pyuv.errno.strerror(error)))
print("Received data: {}".format(data))
if data.strip() == b'exit':
def connect_cb(handle, error):
if error is not None:
print("Connection error: {}".format(pyuv.errno.strerror(error)))
print("Connected to {}".format(tls_h.getpeername()))
def signal_cb(handle, signum):
loop = pyuv.Loop.default_loop()
ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open('ca.pem', 'r').read())
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open('client.cert', 'r').read())
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, open('client.pkey', 'r').read())
tls_h = uvtls.TLS(loop, cert=cert, key=key, ca_list=[ca])
tls_h.connect((sys.argv[1], int(sys.argv[2])), connect_cb)
signal_h = pyuv.Signal(loop)
signal_h.start(signal_cb, signal.SIGINT)
It’s pretty basic, but I hope it serves as a starting point for using pyuv with TLS. I plan to analyze the performance compared to regular Python sockets in another blog post.


greenlet local storage on greenlet 0.4.0

Greenlet 0.4.0 brought an interesting new feature: an instance dictionary on each greenlet object, which makes it a lot simpler to implement greenlet local storage. Here is how greenlet local storage is currently implemented in Eventlet and in Gevent.

As it can be seen, the implementation is not particularly straightforward, mainly due to the fact that the actual information needs to be stored in a separate entity and mapped to each greenlet.

Thanks to the instance dictionary added in 0.4.0, we can use some attribute in it to keep the locally stored objects. The plan is to use a dictionary called __local_dict__ and store the greenlet local attributes there. Here is how it looks like:

from greenlet import getcurrent
__all__ = ['local']
def _get_local_dict():
current = getcurrent()
s = '_%s__local_dict__' % current.__class__.__name__
if not hasattr(current, s):
setattr(current, s, {})
return getattr(current, s)
class local(object):
def __getattribute__(self, attr):
local_dict = _get_local_dict()
return local_dict[attr]
except KeyError:
raise AttributeError("'local' object has no attribute '%s'" % attr)
def __setattr__(self, attr, value):
local_dict = _get_local_dict()
local_dict[attr] = value
def __delattr__(self, attr):
local_dict = _get_local_dict()
del local_dict[attr]
except KeyError:
raise AttributeError(attr)
Hope it’s of use.


Using functools.partial instead of saving arguments

I’m a big fan of functools.partial myself. It allows you to take a function, preset some of its arguments and return another function which you can call with more arguments. Not sure if currying is the right term here, but I’ve heard people refer to functools.partial like that.

Today while browsing the source code for the futures module I came across this class:

class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
view raw hosted with ❤ by GitHub

The moment I saw that func, args and kwargs were saved as attributes in the instance I thought: “why not use partial and save a single attribute?”. Then I thought that maybe performance had something to do here, so I wrote a dead simple stupid test to check it out:

from functools import partial
class WorkItem(object):
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
def run(self):
return self.func(*self.args, **self.wkargs)
except Exception:
class WorkItemPartial(object):
def __init__(self, func, *args, **kwargs):
self.func = partial(func, *args, **kwargs)
def run(self):
return self.func()
except Exception:
def spam(*args, **kw):
def do_test(klass):
item = klass(spam, 'foo', 1, 'bar', bar='baz')
view raw hosted with ❤ by GitHub

Here are the results using CPython 2.7.3:

In [3]: %timeit testpartial.do_test(testpartial.WorkItem)
100000 loops, best of 3: 5.26 us per loop

In [4]: %timeit testpartial.do_test(testpartial.WorkItemPartial)
100000 loops, best of 3: 2.65 us per loop

We are talking microseconds here, but still the version with partial is almost twice as fast.

Now, lets see how PyPy performs:

In [9]: %timeit -n 10000 testpartial.do_test(testpartial.WorkItem)
10000 loops, best of 3: 154 ns per loop
Compiler time: 554050.78 s

In [10]: %timeit -n 10000 testpartial.do_test(testpartial.WorkItemPartial)
10000 loops, best of 3: 2.49 us per loop

Fun, the version without partial goes into the nanoseconds! And the one with partial doesn’t improve much with regards to CPython. Interesting.

So what’s the take here? Well, whenever I see fit I use partial, code looks nicer and it’s apparently faster, so why not? 🙂


Fast(er) locks in Python?

While searching for some information on Python locks I recently ran across this great post by David Beazley. In it, he explains how the syncronization primitives in the Python standard library are implemented. Basically, Lock is implemented as a binary semaphore in C, and the rest are implemented in pure Python. Even if the post is from 2009 this is still the case. UPDATE: As Antoine Pitrou points out in the comments, starting with CPython 3.2, RLock is now implemented in C.

This got me thinking. As you know I’ve created pyuv, a Python wrapper for libuv, and libuv includes cross-platform implementations for mutexes, semaphores, conditions rwlocks and barriers, which I never bothered to add to pyuv. I just didn’t add them because I thought they didn’t add any value, but after reading David’s article I decided to do a quick test: implement wrappers for a mutex and a condition variable and use them in a Queue implementation in order to see if there was any difference in performance. Not that I ever ran into performance issues related to that, but I was curious anyway 🙂

Someone may think “oh, but given that Python has the GIL, how does using multiple threads and speeding up the locks matter?”. The GIL is released whenever a IO operation is performed, so if your Python application is multithreaded and it mainly deals with IO-bound tasks, they GIL is not that relevant. If your application is CPU bound, however, better have a look at the multiprocessing module.

So, lets get into the code! I implemented Barrier, Condition, Mutex, RWLock and Semaphore in this pyuv branch, which directly wrap their libuv counterpats. Then I copied the Queue implementation from the standard library and used the freshly wrapped synchronization primitives:

"""A multi-producer, multi-consumer queue."""
from collections import deque
from time import time as _time
from pyuv import thread as _thread
__all__ = ['Empty', 'Full', 'Queue']
class Empty(Exception):
"Exception raised by Queue.get(block=0)/get_nowait()."
class Full(Exception):
"Exception raised by Queue.put(block=0)/put_nowait()."
class Queue:
"""Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
def __init__(self, maxsize=0):
self.maxsize = maxsize
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = _thread.Mutex()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = _thread.Condition()
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = _thread.Condition()
# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = _thread.Condition()
self.unfinished_tasks = 0
def task_done(self):
"""Indicate that a formerly enqueued task is complete.
Used by Queue consumer threads. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items
placed in the queue.
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.unfinished_tasks = unfinished
def join(self):
"""Blocks until all items in the Queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
while self.unfinished_tasks:
def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
n = self._qsize()
return n
def empty(self):
"""Return True if the queue is empty, False otherwise (not reliable!)."""
n = not self._qsize()
return n
def full(self):
"""Return True if the queue is full, False otherwise (not reliable!)."""
n = 0 < self.maxsize == self._qsize()
return n
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.timedwait(self.mutex, remaining)
self.unfinished_tasks += 1
def put_nowait(self, item):
"""Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
return self.put(item, False)
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.timedwait(self.mutex, remaining)
item = self._get()
return item
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
return self.get(False)
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
# Initialize the queue representation
def _init(self, maxsize):
self.queue = deque()
def _qsize(self, len=len):
return len(self.queue)
# Put a new item in the queue
def _put(self, item):
# Get an item from the queue
def _get(self):
return self.queue.popleft()
view raw hosted with ❤ by GitHub

For the testing part, I used the timeit function from IPython with 5 runs. Not sure if it’s the best way, but results suggest it is a good way 🙂 Here is the test script:

from threading import Thread
from time import sleep
n_items = 1000
queue = None
def put_stuff():
for x in xrange(n_items):
def run_test(q, n):
global queue
queue = q
threads = [Thread(target=put_stuff) for x in xrange(n)]
[t.start() for t in threads]
count = n_items*n
while count > 0:
item = queue.get()
count -= 1
view raw hosted with ❤ by GitHub

Here are the results:

The tests were run with 2, 4 and 100 threads, and since I was testing performance, I added PyPy to the mix. Now, as you can see in the results, the custom Queue is about 33% faster than the one in the standard library, so I was pretty happy about that. Until I tested PyPy. It just beats the shit out of both, which is awesome 🙂

The performance increase on CPython is nice, there is a downside, however: libuv treats errors in this primitives a bit “abruptly”, it calls abort(). This means that if you use the locks incorrectly your program will core dump. I personally like it, because it helps you find and fix the problem right away, but not everyone may like it.