Using SQLObject with Twisted

Over the past week I’ve been working on a small personal project with Twisted. I need to access a database to store and retrieve data, so I started with the obvious, using APIs provided by Twisted.

Twisted provides a database API called adbapi. The API is pretty straightforward, and the operations I wanted to perform were not rocket science anyway, so it served the purpose, but I wans’t 100% satisfied.

I was using the runQuery function, mainly, and putting there a regular SQL statement. I didn’t like that. Then I remembered SQLObject.

SQLObject is a ORM, providing an object oriented API to several databases (I was aiming at MySQL and SQLite). This is what I was looking for. But there is a problem: accessing a database is a blocking operation.

Twisted uses the reactor pattern thus you can’t run a blocking operation in the event loop’s thread without affecting the whole program. Database accessing libraries tend to be blocking, so Twisted runs database operations in another thread and then gets results in a callback in the main thread. This makes the illusion of non-blocking database access.

In order to integrate SQLObject nicely with Twisted this is exactly what we want to do. We’ll defer all database operations to another thread and we’ll get results (or failures) in callback functions. The key function here is deferToThread which will run the specified function in the reactor thread pool and return a deferred. In order to make our life easier we’ll use a decorator which will run the decorated function in the reactor’s thread pool and return a deferred:

def defer_to_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThread(func, *args, **kw)
return wrapper
view raw hosted with ❤ by GitHub

Now lets see a simple (yet full) example of how all this works:

# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <>
__all__ = ['Database', 'DatabaseError']
from sqlobject import connectionForURI, sqlhub, SQLObject, StringCol
from twisted.internet import reactor
from twisted.internet.threads import deferToThread
def defer_to_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThread(func, *args, **kw)
return wrapper
class Users(SQLObject):
nickname = StringCol()
full_name = StringCol()
email = StringCol()
class DatabaseError(Exception): pass
class Database(object):
def __init__(self, dburi):
if ':memory:' in dburi:
print 'SQLite in-memory DB is not supported'
dburi = None
self._uri = dburi
if self._uri is not None:
conn = connectionForURI(self._uri)
sqlhub.processConnection = conn
except Exception, e:
print 'Error connection with the DB: %s' % e
self.connected = False
self.connected = True
self.connected = True
def _create_table(self, klass):
if klass.tableExists():
print 'Table %s does not exists. Creating it now.' % klass.sqlmeta.table
saved = klass._connection.debug
klass._connection.debug = True
klass._connection.debug = saved
def initialize(self):
if self.connected:
for klass in [Users]: # We'd initialize all SQLObjects here
def get_user_data(self, nickname):
user = Users.selectBy(nickname=nickname)[0]
except IndexError:
raise DatabaseError("User %s doesn't exist" % nickname)
return user
def main():
def got_result(user):
print 'User info:'
print '\tNickname: %s' % user.nickname
print '\tFull name: %s' % user.full_name
print '\tEmail address: %s' %
def got_error(error):
print 'Got error! %s' % error.getErrorMessage()
db = Database('sqlite:///tmp/test.sqlite')
d = db.get_user_data('saghul')
if __name__ == '__main__':
reactor.callLater(0, main)

As you can see the results (or errors) are retrieved in the got_result/got_error callback functions asynchronously, and as the operation was executed in a different thread this didn’t affect the main event loop.


Leave a Reply