Serializing DB operations with Twisted and SQLObject

Long time no write! A while ago I wrote a post about how to combine Twisted with SQLObject. The method describe there, however, doesn’t work particularly well when using SQLite.

Since SQLite databases are a single file (or a memory area) some operations will need to lock when writing. In the other post we used deferToThread, which will run a function on a different thread and return a Deferred which will fire when the operation is finished. The thread on which the operation will be run is taken from the reactor thread pool, so it’s not a single thread. This means that if more than one write operation is executed almost at the same time, the first operation will get the lock, and the others will need to wait. When using SQLObject, the operation will fail if the lock can’t be acquired, so let’s see what we can do about it.

One option would be to use the timeout parameter and set it to some reasonable value, so that the operation would wait that amount of time before giving up on the lock. It’s not 100% guaranteed that it will get it, so this could be complemented with a wait-retry strategy.

Or we can just run all database operations in a single thread. This way only one operation will be executed at a time so we avoid the issue. Also, since we return deferreds nothing will be blocked waiting for results.

In order to do this I chose to use Twisted’s ThreadPoolclass, with a single thread. The reason for doing it this way is that Twisted provides us with nice functions to run operations on one of these pools and return a deferred.

First we setup our thread pool:

pool = ThreadPool(minthreads=1, maxthreads=1, name='db-ops')
pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)

We will need to stop the thread pool when our application ends, hence the call to addSystemEventTrigger. It will run the specified function when the reactor is about to be stopped.

The we’ll create a decorator which we’ll use to decorate functions that will run on the thread pool:

def run_in_db_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThreadPool(reactor, pool, func, *args, **kw)
return wrapper

Since the thread pool only has a single thread we are effectively serializing all database operations, if they are called using our decorator. Lets see a complete example:

# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <saghul@gmail.com>
#
__all__ = ['Database', 'DatabaseError']
from threading import Thread
from sqlobject import connectionForURI, sqlhub, SQLObject, StringCol
from twisted.internet import reactor
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
pool = ThreadPool(minthreads=1, maxthreads=1, name='db-ops')
pool.start()
reactor.addSystemEventTrigger('before', 'shutdown', pool.stop)
def run_in_db_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThreadPool(reactor, pool, func, *args, **kw)
return wrapper
class Users(SQLObject):
nickname = StringCol()
full_name = StringCol()
email = StringCol()
class DatabaseError(Exception): pass
class Database(object):
def __init__(self, dburi=None):
self._uri = dburi or 'sqlite:/:memory:'
self.initialize()
def _create_table(self, klass):
if klass.tableExists():
return
else:
print 'Table %s does not exists. Creating it now.' % klass.sqlmeta.table
saved = klass._connection.debug
try:
klass._connection.debug = True
klass.createTable()
finally:
klass._connection.debug = saved
@run_in_db_thread
def initialize(self):
try:
conn = connectionForURI(self._uri)
sqlhub.processConnection = conn
except Exception, e:
print 'Error connection with the DB: %s' % e
self.connected = False
return
else:
self.connected = True
for klass in [Users]: # We'd initialize all SQLObjects here
self._create_table(klass)
@run_in_db_thread
def create_user(self, nickname, fullname, email):
return Users(nickname=nickname, full_name=fullname, email=email)
@run_in_db_thread
def get_user_data(self, nickname):
try:
user = Users.selectBy(nickname=nickname)[0]
except IndexError:
raise DatabaseError("User %s doesn't exist" % nickname)
else:
return user
def main():
def got_result(user):
print 'User info:'
print '\tNickname: %s' % user.nickname
print '\tFull name: %s' % user.full_name
print '\tEmail address: %s' % user.email
def got_error(error):
print 'Got error! %s' % error.getErrorMessage()
db = Database()
db.create_user('saghul', 'saghul', 'saghul@gmail.com')
db.create_user('saghul2', 'saghul2', 'saghul@gmail.com')
db.create_user('saghu3', 'saghul3', 'saghul@gmail.com')
db.create_user('saghul4', 'saghul4', 'saghul@gmail.com')
d = db.get_user_data('saghul')
d.addCallbacks(got_result, got_error)
def run_reactor():
reactor.run(installSignalHandlers=False)
def reactor_stop():
from time import sleep
sleep(3)
reactor.callFromThread(reactor.stop)
if __name__ == '__main__':
Thread(target=run_reactor).start()
main()
reactor_stop()

Hope you find it useful!

:wq

2 thoughts on “Serializing DB operations with Twisted and SQLObject

  1. Awesome!Thanks for all the great examples for how to set up a threadpool. My backend for Gridspy is going to use this approach to serialise our database connections. I love the idea for using a decorator for this (was using lots of deferToThread calls before).Also, thanks for pointing out addSystemEventTrigger – hadn’t seen that before.Overall, a great tutorial. Thanks :)-Tom

Leave a Reply