Serving a WSGI app, WebSockets and static files with Twisted

Long time no post! Lets solve that now shall we?

A few days ago I started playing a bit with Flask, since I’m considering it as the framework to build some API server. I have no web development experience, and Flask looks like a great project so I went with that.

I started with a tiny little hello world, and then I wanted to add some websockets and some CSS. Oh the trouble. When I started looking for how to combine a Flask app with WebSockets I found references to gevent-socketio for the most part, but I somewhat wanted to use Twisted this time, so I kept looking. Soon enough I found AutoBahn, a great WebSocket implementation for Twisted, which can be combined with a WSGI app, brilliant! After seeing how AutoBahn manages to add the websocket route to the WSGI app, adding support for static files was kind of trivial.

Here is the result of my experiments, a really simple web app which consists of a Flask WSGI app, a WebSocket server and some static files, all served by the same process running Twisted. You may not want to do this in a production environment, but hey, I’m just playing here 🙂

import os
from autobahn.resource import WebSocketResource, WSGIRootResource
from autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocol
from flask import Flask, render_template
from twisted.application import internet, service
from twisted.internet import reactor
from twisted.python.threadpool import ThreadPool
from twisted.web.server import Site
from twisted.web.static import File
from twisted.web.wsgi import WSGIResource
import settings
class EchoServerProtocol(WebSocketServerProtocol):
def onMessage(self, msg, binary):
self.sendMessage(msg, binary)
app = Flask('Echo Test')
@app.route('/')
def index():
return render_template('index.html', ws_port=settings.PORT)
# create a Twisted Web resource for our WebSocket server
ws_factory = WebSocketServerFactory("ws://%s:%d" % (settings.INTERFACE, settings.PORT))
ws_factory.protocol = EchoServerProtocol
ws_resource = WebSocketResource(ws_factory)
# create thread pool used to serve WSGI requests
thread_pool = ThreadPool(maxthreads=settings.THREAD_POOL_SIZE)
thread_pool.start()
reactor.addSystemEventTrigger('before', 'shutdown', thread_pool.stop)
# create a Twisted Web WSGI resource for our Flask server
wsgi_resource = WSGIResource(reactor, thread_pool, app)
# create resource for static assets
static_resource = File(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates', 'assets'))
# create a root resource serving everything via WSGI/Flask, but
# the path "/assets" served by our File stuff and
# the path "/ws" served by our WebSocket stuff
root_resource = WSGIRootResource(wsgi_resource, {'assets': static_resource, 'ws': ws_resource})
# create a Twisted Web Site
site = Site(root_resource)
# setup an application for serving the site
web_service = internet.TCPServer(settings.PORT, site, interface=settings.INTERFACE)
application = service.Application("Echo Test")
web_service.setServiceParent(application)
view raw app.py hosted with ❤ by GitHub
<html>
<head>
<link rel="stylesheet" type="text/css" href="/assets/style.css">
<script type="text/javascript">
var sock = null;
var ellog = null;
window.onload = function() {
ellog = document.getElementById('log');
var wsuri = "ws://" + window.location.hostname + ":{{ ws_port }}/ws";
if ("WebSocket" in window) {
sock = new WebSocket(wsuri);
} else {
log("Browser does not support WebSocket!");
window.location = "http://autobahn.ws/unsupportedbrowser";
}
if (sock) {
sock.onopen = function() {
log("Connected to " + wsuri);
}
sock.onclose = function(e) {
log("Connection closed (wasClean = " + e.wasClean + ", code = " + e.code + ", reason = '" + e.reason + "')");
sock = null;
}
sock.onmessage = function(e) {
log("Got echo: " + e.data);
}
}
};
function send() {
var msg = document.getElementById('message').value;
if (sock) {
sock.send(msg);
log("Sent: " + msg);
} else {
log("Not connected.");
}
};
function log(m) {
ellog.innerHTML += m + '\n';
ellog.scrollTop = ellog.scrollHeight;
};
</script>
</head>
<body>
<h1>Echo Test</h1>
<noscript>You must enable JavaScript</noscript>
<form>
<p>Message: <input id="message" type="text" size="50" maxlength="50" value="Hello, world!"></p>
</form>
<button onclick='send();'>Send Message</button>
<pre id="log" style="height: 20em; overflow-y: scroll;"></pre>
</body>
</html>
view raw index.html hosted with ❤ by GitHub
# Make sure this file is valid Python code
PORT = 8081
INTERFACE = "0.0.0.0"
THREAD_POOL_SIZE = 10
view raw settings.py hosted with ❤ by GitHub
body {
font-family: Helvetica,Arial,sans-serif;
background-color:#b0c4de;
}
view raw style.css hosted with ❤ by GitHub

Since Gist does not currently allow folders, make sure you keep this layout after downloading the files:

├── app.py
├── settings.py
└── templates
    ├── assets
    │   └── style.css
    └── index.html

We’ll use the twistd command line tool to launch out application, since it can take care of logging, running as a daemon, etc. To run it in the foreground:

twistd -n -l - -y app.py

This will launch the application in non-daemon mode and log to standard output.

Hope this helps someone, all feedback is more than welcome 🙂

:wq

Evergreen: cooperative multitasking and i/o for Python

I’ve been working on-and-off on this project for almost a year during my free time, and after meditating about it I thought: “fuck it, ship it”. Allow me to introduce Evergreen: cooperative multitasking and i/o for Python.

“So, another framework?” I hear you say. Yes, it’s another async framework. But it’s my async framework. I’ve used a number of frameworks for developing servers in Python such as Twisted, Tornado, Eventlet, Gevent and lately Tulip and all of them have great and not so great things, so I decided to blend the ideas I gathered from all of them, add some opinionated decisions, some Stackless flavour and Evergreen was the result.

standards

Evergreeen is a framework which allows developers to write synchronous looking code which is executed asynchronously in a cooperative manner. Evergreen presents an API which looks like the one you would use to write concurrent programs using threads or futures from the Python standard library. The facilities provided by Evergreen are however cooperative, that is, while a task is busy waiting for some i/o other tasks will have their chance to run.

“Show me the code!” I hear you say. Sure, it’s up here on GitHub, released under the MIT license. Since the usual example is a web crawler, here you have one.

Did I mention it supports Python 2 and 3?

“Is it production ready?” I hear you say. It’s still on a very early stage, but I believe the foundation is solid. However, the APIs provided by Evergreen may change a bit until I feel confortable with them. All feedback is welcome, so if you give it a try do let me know!

I’d like to thank all authors of similar libraries for releasing their work as Open Source which I could look into and learn from.

I hope Evergreen can help you solve some problems and you enjoy using it as much as I do developing it.

:wq

 

Integrating Twisted and Tornado with pyuv

With yesterday’s pyuv release the door was open for integrating pyuv with other event loops or applications. Thanks to the Poll handle we can now create a regular socket in Python and put it in pyuv’s event loop, so we can also use pyuv to replace other event loops ;–)

I created a couple of projects for toying around with this feature. The first project implements a Tornado IOLoop which runs on top of pyuv, and the second one implements a Twisted reactor on top of pyuv.

They are not feature complete yet, but basics are working and I’ll be adding more features as time allows.

Go check them out on GitHub!

:wq

TunnelIt, a reverse SSH forwarder

Hi! Today I want to show you a small project I’ve been working on during some free time. I called it TunnelIt and its a reverse SSH forwarder.

I borrowed inspiration from the popular Tunnlr service and built it using Twisted. Twisted is a really good tool for this purpose because apart from being a cross-platform asynchronous framework, it implements many well known protocols, such as SSH.

Code and instructions are up on the TunnelIt repository on GitHub, feel free to use it, fork it and send pull requests :–)

Happy tunneling!

:wq

Serializing DB operations with Twisted and SQLObject

Long time no write! A while ago I wrote a post about how to combine Twisted with SQLObject. The method describe there, however, doesn’t work particularly well when using SQLite.

Since SQLite databases are a single file (or a memory area) some operations will need to lock when writing. In the other post we used deferToThread, which will run a function on a different thread and return a Deferred which will fire when the operation is finished. The thread on which the operation will be run is taken from the reactor thread pool, so it’s not a single thread. This means that if more than one write operation is executed almost at the same time, the first operation will get the lock, and the others will need to wait. When using SQLObject, the operation will fail if the lock can’t be acquired, so let’s see what we can do about it.

One option would be to use the timeout parameter and set it to some reasonable value, so that the operation would wait that amount of time before giving up on the lock. It’s not 100% guaranteed that it will get it, so this could be complemented with a wait-retry strategy.

Or we can just run all database operations in a single thread. This way only one operation will be executed at a time so we avoid the issue. Also, since we return deferreds nothing will be blocked waiting for results.

In order to do this I chose to use Twisted’s ThreadPoolclass, with a single thread. The reason for doing it this way is that Twisted provides us with nice functions to run operations on one of these pools and return a deferred.

First we setup our thread pool:

pool = ThreadPool(minthreads=1, maxthreads=1, name='db-ops')
pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)

We will need to stop the thread pool when our application ends, hence the call to addSystemEventTrigger. It will run the specified function when the reactor is about to be stopped.

The we’ll create a decorator which we’ll use to decorate functions that will run on the thread pool:

def run_in_db_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThreadPool(reactor, pool, func, *args, **kw)
return wrapper

Since the thread pool only has a single thread we are effectively serializing all database operations, if they are called using our decorator. Lets see a complete example:

# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <saghul@gmail.com>
#
__all__ = ['Database', 'DatabaseError']
from threading import Thread
from sqlobject import connectionForURI, sqlhub, SQLObject, StringCol
from twisted.internet import reactor
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
pool = ThreadPool(minthreads=1, maxthreads=1, name='db-ops')
pool.start()
reactor.addSystemEventTrigger('before', 'shutdown', pool.stop)
def run_in_db_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThreadPool(reactor, pool, func, *args, **kw)
return wrapper
class Users(SQLObject):
nickname = StringCol()
full_name = StringCol()
email = StringCol()
class DatabaseError(Exception): pass
class Database(object):
def __init__(self, dburi=None):
self._uri = dburi or 'sqlite:/:memory:'
self.initialize()
def _create_table(self, klass):
if klass.tableExists():
return
else:
print 'Table %s does not exists. Creating it now.' % klass.sqlmeta.table
saved = klass._connection.debug
try:
klass._connection.debug = True
klass.createTable()
finally:
klass._connection.debug = saved
@run_in_db_thread
def initialize(self):
try:
conn = connectionForURI(self._uri)
sqlhub.processConnection = conn
except Exception, e:
print 'Error connection with the DB: %s' % e
self.connected = False
return
else:
self.connected = True
for klass in [Users]: # We'd initialize all SQLObjects here
self._create_table(klass)
@run_in_db_thread
def create_user(self, nickname, fullname, email):
return Users(nickname=nickname, full_name=fullname, email=email)
@run_in_db_thread
def get_user_data(self, nickname):
try:
user = Users.selectBy(nickname=nickname)[0]
except IndexError:
raise DatabaseError("User %s doesn't exist" % nickname)
else:
return user
def main():
def got_result(user):
print 'User info:'
print '\tNickname: %s' % user.nickname
print '\tFull name: %s' % user.full_name
print '\tEmail address: %s' % user.email
def got_error(error):
print 'Got error! %s' % error.getErrorMessage()
db = Database()
db.create_user('saghul', 'saghul', 'saghul@gmail.com')
db.create_user('saghul2', 'saghul2', 'saghul@gmail.com')
db.create_user('saghu3', 'saghul3', 'saghul@gmail.com')
db.create_user('saghul4', 'saghul4', 'saghul@gmail.com')
d = db.get_user_data('saghul')
d.addCallbacks(got_result, got_error)
def run_reactor():
reactor.run(installSignalHandlers=False)
def reactor_stop():
from time import sleep
sleep(3)
reactor.callFromThread(reactor.stop)
if __name__ == '__main__':
Thread(target=run_reactor).start()
main()
reactor_stop()

Hope you find it useful!

:wq

Checking Google Voice SIP service availability

During the past weeks Google has been turning on and off the inbound SIP service in Google Voice. Suddenly you woke up and saw someone in Twitter claiming that it worked, and hours later it just didn’t work anymore. Then I thought about making a way to ping Google Voice servers and getting this information in a nice way.

When inbound SIP support is enabled Google Voice servers (which run OpenSIPS, by the way) respond to OPTIONS requests, so my idea was very simple: send a SIP OPTIONS message to GV servers and print the result somewhere. This somewhere had to be the web. But I didn’t want to host a website, so I thought “hey, maybe it’s time to try that Google App Engine!”.

Applications run in a sandboxed environment on GAE, so you can’t have extension modules or even access the socket module. But you can make outgoing HTTP requests. With these limitations in mind this is what I came up with:

GAE has cron job support, that is, it can do a GET request at regular intervals. So why not leverage this and make a GET request every 5 minutes to some web service which tells us the GV SIP service status and store it? This way we don’t need to connect anywhere every time someone wants to see this data, because is kept up to date by the cron job.

This required a component which must able to handle HTTP requests and generate SIP OPTIONS requests. I didn’t have this, but since I work every day with the python-sipsimple library and I know some Twisted it took me very little time to build one. I called it SIPwPing and it’s available on my GitHub.

Some of you may think this is completely over-engineered. And you are right. But I wanted to play with GAE and this was a very good excuse to do so. 🙂

Application is now live running at http://gvoice-sip-status.appspot.com and the source code will be available on my GitHub account later today.

PS: The application is using the free plan and if any quota is exceeded at some point I’ll not bother much, since I already had the fun making it work 🙂

:wq

Gae_gv_error
Gae_gv_unknown
Gae_gv_ok

Using SQLObject with Twisted

Over the past week I’ve been working on a small personal project with Twisted. I need to access a database to store and retrieve data, so I started with the obvious, using APIs provided by Twisted.

Twisted provides a database API called adbapi. The API is pretty straightforward, and the operations I wanted to perform were not rocket science anyway, so it served the purpose, but I wans’t 100% satisfied.

I was using the runQuery function, mainly, and putting there a regular SQL statement. I didn’t like that. Then I remembered SQLObject.

SQLObject is a ORM, providing an object oriented API to several databases (I was aiming at MySQL and SQLite). This is what I was looking for. But there is a problem: accessing a database is a blocking operation.

Twisted uses the reactor pattern thus you can’t run a blocking operation in the event loop’s thread without affecting the whole program. Database accessing libraries tend to be blocking, so Twisted runs database operations in another thread and then gets results in a callback in the main thread. This makes the illusion of non-blocking database access.

In order to integrate SQLObject nicely with Twisted this is exactly what we want to do. We’ll defer all database operations to another thread and we’ll get results (or failures) in callback functions. The key function here is deferToThread which will run the specified function in the reactor thread pool and return a deferred. In order to make our life easier we’ll use a decorator which will run the decorated function in the reactor’s thread pool and return a deferred:

def defer_to_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThread(func, *args, **kw)
return wrapper
view raw database.py hosted with ❤ by GitHub

Now lets see a simple (yet full) example of how all this works:

# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <saghul@gmail.com>
#
__all__ = ['Database', 'DatabaseError']
from sqlobject import connectionForURI, sqlhub, SQLObject, StringCol
from twisted.internet import reactor
from twisted.internet.threads import deferToThread
def defer_to_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThread(func, *args, **kw)
return wrapper
class Users(SQLObject):
nickname = StringCol()
full_name = StringCol()
email = StringCol()
class DatabaseError(Exception): pass
class Database(object):
def __init__(self, dburi):
if ':memory:' in dburi:
print 'SQLite in-memory DB is not supported'
dburi = None
self._uri = dburi
if self._uri is not None:
try:
conn = connectionForURI(self._uri)
sqlhub.processConnection = conn
except Exception, e:
print 'Error connection with the DB: %s' % e
self.connected = False
else:
self.connected = True
else:
self.connected = True
def _create_table(self, klass):
if klass.tableExists():
return
else:
print 'Table %s does not exists. Creating it now.' % klass.sqlmeta.table
saved = klass._connection.debug
try:
klass._connection.debug = True
klass.createTable()
finally:
klass._connection.debug = saved
def initialize(self):
if self.connected:
for klass in [Users]: # We'd initialize all SQLObjects here
self._create_table(klass)
@defer_to_thread
def get_user_data(self, nickname):
try:
user = Users.selectBy(nickname=nickname)[0]
except IndexError:
raise DatabaseError("User %s doesn't exist" % nickname)
else:
return user
def main():
def got_result(user):
print 'User info:'
print '\tNickname: %s' % user.nickname
print '\tFull name: %s' % user.full_name
print '\tEmail address: %s' % user.email
def got_error(error):
print 'Got error! %s' % error.getErrorMessage()
db = Database('sqlite:///tmp/test.sqlite')
db.initialize()
d = db.get_user_data('saghul')
d.addCallback(got_result)
d.addErrback(got_error)
if __name__ == '__main__':
reactor.callLater(0, main)
reactor.run()

As you can see the results (or errors) are retrieved in the got_result/got_error callback functions asynchronously, and as the operation was executed in a different thread this didn’t affect the main event loop.

:wq