Serializing DB operations with Twisted and SQLObject

Long time no write! A while ago I wrote a post about how to combine Twisted with SQLObject. The method describe there, however, doesn’t work particularly well when using SQLite.

Since SQLite databases are a single file (or a memory area) some operations will need to lock when writing. In the other post we used deferToThread, which will run a function on a different thread and return a Deferred which will fire when the operation is finished. The thread on which the operation will be run is taken from the reactor thread pool, so it’s not a single thread. This means that if more than one write operation is executed almost at the same time, the first operation will get the lock, and the others will need to wait. When using SQLObject, the operation will fail if the lock can’t be acquired, so let’s see what we can do about it.

One option would be to use the timeout parameter and set it to some reasonable value, so that the operation would wait that amount of time before giving up on the lock. It’s not 100% guaranteed that it will get it, so this could be complemented with a wait-retry strategy.

Or we can just run all database operations in a single thread. This way only one operation will be executed at a time so we avoid the issue. Also, since we return deferreds nothing will be blocked waiting for results.

In order to do this I chose to use Twisted’s ThreadPoolclass, with a single thread. The reason for doing it this way is that Twisted provides us with nice functions to run operations on one of these pools and return a deferred.

First we setup our thread pool:

pool = ThreadPool(minthreads=1, maxthreads=1, name='db-ops')
reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)

We will need to stop the thread pool when our application ends, hence the call to addSystemEventTrigger. It will run the specified function when the reactor is about to be stopped.

The we’ll create a decorator which we’ll use to decorate functions that will run on the thread pool:

def run_in_db_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThreadPool(reactor, pool, func, *args, **kw)
return wrapper

Since the thread pool only has a single thread we are effectively serializing all database operations, if they are called using our decorator. Lets see a complete example:

# coding=utf8
# Copyright (C) 2011 Saúl Ibarra Corretgé <>
__all__ = ['Database', 'DatabaseError']
from threading import Thread
from sqlobject import connectionForURI, sqlhub, SQLObject, StringCol
from twisted.internet import reactor
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
pool = ThreadPool(minthreads=1, maxthreads=1, name='db-ops')
reactor.addSystemEventTrigger('before', 'shutdown', pool.stop)
def run_in_db_thread(func):
"""Decorator to run DB queries in Twisted's thread pool"""
def wrapper(*args, **kw):
return deferToThreadPool(reactor, pool, func, *args, **kw)
return wrapper
class Users(SQLObject):
nickname = StringCol()
full_name = StringCol()
email = StringCol()
class DatabaseError(Exception): pass
class Database(object):
def __init__(self, dburi=None):
self._uri = dburi or 'sqlite:/:memory:'
def _create_table(self, klass):
if klass.tableExists():
print 'Table %s does not exists. Creating it now.' % klass.sqlmeta.table
saved = klass._connection.debug
klass._connection.debug = True
klass._connection.debug = saved
def initialize(self):
conn = connectionForURI(self._uri)
sqlhub.processConnection = conn
except Exception, e:
print 'Error connection with the DB: %s' % e
self.connected = False
self.connected = True
for klass in [Users]: # We'd initialize all SQLObjects here
def create_user(self, nickname, fullname, email):
return Users(nickname=nickname, full_name=fullname, email=email)
def get_user_data(self, nickname):
user = Users.selectBy(nickname=nickname)[0]
except IndexError:
raise DatabaseError("User %s doesn't exist" % nickname)
return user
def main():
def got_result(user):
print 'User info:'
print '\tNickname: %s' % user.nickname
print '\tFull name: %s' % user.full_name
print '\tEmail address: %s' %
def got_error(error):
print 'Got error! %s' % error.getErrorMessage()
db = Database()
db.create_user('saghul', 'saghul', '')
db.create_user('saghul2', 'saghul2', '')
db.create_user('saghu3', 'saghul3', '')
db.create_user('saghul4', 'saghul4', '')
d = db.get_user_data('saghul')
d.addCallbacks(got_result, got_error)
def run_reactor():
def reactor_stop():
from time import sleep
if __name__ == '__main__':

Hope you find it useful!


Creating a “smart” Twitter bot

A while ago I was bored at home and decided to play a bit with the Twitter API. As a result I created a simple Twitter Bot which retewitted everything with the *#asterisk* hashtag (>

It’s functionality was very simple andI didn’t even touch the (crappy) code for months, but Twitter decided to drop the basic authentication support in favor of OAuth so it was the right time for updating the bot.

Since the bot was created lots of other bots have, and it feels annoying to get the same tweet over and over again, so I though I’d try to make the bot a bit *smarter*. The main goal was to detect if a tweet was already tweeted (or RTd) not to repeat things. AFAIK there are 2 types of retweets:

  • “some text RT @user1 RT @user2 some super cool tweet here!”
  • “some super cool tweet! (via @user)”

The current bot detects both formats by using regular expressions and extracts the tweet itself. However, I felt this could not be enough, because we could get the retweet *before* the real tweet so it’s be great to save the entire tweet and then look for it.

As the bot was using a SQLite database I decided to use its *Full Text Search* (fts3) capability, inspired by a colleague. With FTS searching for an entire string is amazingly faster than doing a regular SELECT query. Here is an example taken from the SQLite website

CREATE VIRTUAL TABLE enrondata1 USING fts3(content TEXT); /* FTS3 table */
CREATE TABLE enrondata2(content TEXT); /* Ordinary table */

SELECT count(*) FROM enrondata1 WHERE content MATCH 'linux'; /* 0.03 seconds */
SELECT count(*) FROM enrondata2 WHERE content LIKE '%linux%'; /* 22.5 seconds */

The (silly) code for this bot is available on GitHub:
Happy tweeting!