Serializing DB operations with Twisted and SQLObject

Long time no write! A while ago I wrote a post about how to combine Twisted with SQLObject. The method describe there, however, doesn’t work particularly well when using SQLite.

Since SQLite databases are a single file (or a memory area) some operations will need to lock when writing. In the other post we used deferToThread, which will run a function on a different thread and return a Deferred which will fire when the operation is finished. The thread on which the operation will be run is taken from the reactor thread pool, so it’s not a single thread. This means that if more than one write operation is executed almost at the same time, the first operation will get the lock, and the others will need to wait. When using SQLObject, the operation will fail if the lock can’t be acquired, so let’s see what we can do about it.

One option would be to use the timeout parameter and set it to some reasonable value, so that the operation would wait that amount of time before giving up on the lock. It’s not 100% guaranteed that it will get it, so this could be complemented with a wait-retry strategy.

Or we can just run all database operations in a single thread. This way only one operation will be executed at a time so we avoid the issue. Also, since we return deferreds nothing will be blocked waiting for results.

In order to do this I chose to use Twisted’s ThreadPoolclass, with a single thread. The reason for doing it this way is that Twisted provides us with nice functions to run operations on one of these pools and return a deferred.

First we setup our thread pool:

pool = ThreadPool(minthreads=1, maxthreads=1, name='db-ops')
pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)
view raw twisted_sqlobj.py hosted with ❤ by GitHub

We will need to stop the thread pool when our application ends, hence the call to addSystemEventTrigger. It will run the specified function when the reactor is about to be stopped.

The we’ll create a decorator which we’ll use to decorate functions that will run on the thread pool:

def run_in_db_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThreadPool(reactor, pool, func, *args, **kw)
return wrapper
view raw twisted_sqlobj.py hosted with ❤ by GitHub

Since the thread pool only has a single thread we are effectively serializing all database operations, if they are called using our decorator. Lets see a complete example:

# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <saghul@gmail.com>
#
__all__ = ['Database', 'DatabaseError']
from threading import Thread
from sqlobject import connectionForURI, sqlhub, SQLObject, StringCol
from twisted.internet import reactor
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
pool = ThreadPool(minthreads=1, maxthreads=1, name='db-ops')
pool.start()
reactor.addSystemEventTrigger('before', 'shutdown', pool.stop)
def run_in_db_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThreadPool(reactor, pool, func, *args, **kw)
return wrapper
class Users(SQLObject):
nickname = StringCol()
full_name = StringCol()
email = StringCol()
class DatabaseError(Exception): pass
class Database(object):
def __init__(self, dburi=None):
self._uri = dburi or 'sqlite:/:memory:'
self.initialize()
def _create_table(self, klass):
if klass.tableExists():
return
else:
print 'Table %s does not exists. Creating it now.' % klass.sqlmeta.table
saved = klass._connection.debug
try:
klass._connection.debug = True
klass.createTable()
finally:
klass._connection.debug = saved
@run_in_db_thread
def initialize(self):
try:
conn = connectionForURI(self._uri)
sqlhub.processConnection = conn
except Exception, e:
print 'Error connection with the DB: %s' % e
self.connected = False
return
else:
self.connected = True
for klass in [Users]: # We'd initialize all SQLObjects here
self._create_table(klass)
@run_in_db_thread
def create_user(self, nickname, fullname, email):
return Users(nickname=nickname, full_name=fullname, email=email)
@run_in_db_thread
def get_user_data(self, nickname):
try:
user = Users.selectBy(nickname=nickname)[0]
except IndexError:
raise DatabaseError("User %s doesn't exist" % nickname)
else:
return user
def main():
def got_result(user):
print 'User info:'
print '\tNickname: %s' % user.nickname
print '\tFull name: %s' % user.full_name
print '\tEmail address: %s' % user.email
def got_error(error):
print 'Got error! %s' % error.getErrorMessage()
db = Database()
db.create_user('saghul', 'saghul', 'saghul@gmail.com')
db.create_user('saghul2', 'saghul2', 'saghul@gmail.com')
db.create_user('saghu3', 'saghul3', 'saghul@gmail.com')
db.create_user('saghul4', 'saghul4', 'saghul@gmail.com')
d = db.get_user_data('saghul')
d.addCallbacks(got_result, got_error)
def run_reactor():
reactor.run(installSignalHandlers=False)
def reactor_stop():
from time import sleep
sleep(3)
reactor.callFromThread(reactor.stop)
if __name__ == '__main__':
Thread(target=run_reactor).start()
main()
reactor_stop()
view raw twisted_sqlobj.py hosted with ❤ by GitHub

Hope you find it useful!

:wq

Using SQLObject with Twisted

Over the past week I’ve been working on a small personal project with Twisted. I need to access a database to store and retrieve data, so I started with the obvious, using APIs provided by Twisted.

Twisted provides a database API called adbapi. The API is pretty straightforward, and the operations I wanted to perform were not rocket science anyway, so it served the purpose, but I wans’t 100% satisfied.

I was using the runQuery function, mainly, and putting there a regular SQL statement. I didn’t like that. Then I remembered SQLObject.

SQLObject is a ORM, providing an object oriented API to several databases (I was aiming at MySQL and SQLite). This is what I was looking for. But there is a problem: accessing a database is a blocking operation.

Twisted uses the reactor pattern thus you can’t run a blocking operation in the event loop’s thread without affecting the whole program. Database accessing libraries tend to be blocking, so Twisted runs database operations in another thread and then gets results in a callback in the main thread. This makes the illusion of non-blocking database access.

In order to integrate SQLObject nicely with Twisted this is exactly what we want to do. We’ll defer all database operations to another thread and we’ll get results (or failures) in callback functions. The key function here is deferToThread which will run the specified function in the reactor thread pool and return a deferred. In order to make our life easier we’ll use a decorator which will run the decorated function in the reactor’s thread pool and return a deferred:

def defer_to_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThread(func, *args, **kw)
return wrapper
view raw database.py hosted with ❤ by GitHub

Now lets see a simple (yet full) example of how all this works:

# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <saghul@gmail.com>
#
__all__ = ['Database', 'DatabaseError']
from sqlobject import connectionForURI, sqlhub, SQLObject, StringCol
from twisted.internet import reactor
from twisted.internet.threads import deferToThread
def defer_to_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThread(func, *args, **kw)
return wrapper
class Users(SQLObject):
nickname = StringCol()
full_name = StringCol()
email = StringCol()
class DatabaseError(Exception): pass
class Database(object):
def __init__(self, dburi):
if ':memory:' in dburi:
print 'SQLite in-memory DB is not supported'
dburi = None
self._uri = dburi
if self._uri is not None:
try:
conn = connectionForURI(self._uri)
sqlhub.processConnection = conn
except Exception, e:
print 'Error connection with the DB: %s' % e
self.connected = False
else:
self.connected = True
else:
self.connected = True
def _create_table(self, klass):
if klass.tableExists():
return
else:
print 'Table %s does not exists. Creating it now.' % klass.sqlmeta.table
saved = klass._connection.debug
try:
klass._connection.debug = True
klass.createTable()
finally:
klass._connection.debug = saved
def initialize(self):
if self.connected:
for klass in [Users]: # We'd initialize all SQLObjects here
self._create_table(klass)
@defer_to_thread
def get_user_data(self, nickname):
try:
user = Users.selectBy(nickname=nickname)[0]
except IndexError:
raise DatabaseError("User %s doesn't exist" % nickname)
else:
return user
def main():
def got_result(user):
print 'User info:'
print '\tNickname: %s' % user.nickname
print '\tFull name: %s' % user.full_name
print '\tEmail address: %s' % user.email
def got_error(error):
print 'Got error! %s' % error.getErrorMessage()
db = Database('sqlite:///tmp/test.sqlite')
db.initialize()
d = db.get_user_data('saghul')
d.addCallback(got_result)
d.addErrback(got_error)
if __name__ == '__main__':
reactor.callLater(0, main)
reactor.run()
view raw twisted_sqlobj.py hosted with ❤ by GitHub

As you can see the results (or errors) are retrieved in the got_result/got_error callback functions asynchronously, and as the operation was executed in a different thread this didn’t affect the main event loop.

:wq