This article is part of a series. You do not have to read them in order but I will be referring to topics and explanations in previous articles:

  1. Implementing Your Own Transactions With MVCC
  2. SQL Transaction Isolation Levels Explained
  3. Implementing Repeatable Read and Serializable Transaction Isolation



Repeatable read and serializable are usually higher than most databases use by default (if they are available at all). You can read a full explanation of the differences in the levels in the previous article, SQL Transaction Isolation Levels Explained.

This article will focus on a real Python implementation of all four levels but mainly focused on repeatable read and serializable.

I’m going to use the code from the original article on MVCC which implemented read-committed and refactor it to allow us to set the transaction isolation. You should understand how MVCC works an how it is implemented for read committed in that article before proceeding.

You can view the entire program here.

Implementing the Isolation Levels as Classes

The majority of the logic of a transaction will remain in the Transaction class. There have been some new methods added:

class Transaction:
def __init__(self, table, xid):
self.table = table
self.xid = xid
self.rollback_actions = []

def add_record(self, id, name):
record = {
'id': id,
'name': name,
'created_xid': self.xid,
'expired_xid': 0
}
self.rollback_actions.append(["delete", len(self.table.records)])
self.table.records.append(record)

def delete_record(self, id):
for i, record in enumerate(self.table.records):
if self.record_is_visible(record) and record['id'] == id:
if self.record_is_locked(record):
raise RollbackException("Row locked by another transaction.")
else:
record['expired_xid'] = self.xid
self.rollback_actions.append(["add", i])

def update_record(self, id, name):
self.delete_record(id)
self.add_record(id, name)

def fetch_record(self, id):
for record in self.table.records:
if self.record_is_visible(record) and record['id'] is id:
return record

return None

def count_records(self, min_id, max_id):
count = 0
for record in self.table.records:
if self.record_is_visible(record) and \
min_id <= record['id'] <= max_id:
count += 1

return count

def fetch_all_records(self):
visible_records = []
for record in self.table.records:
if self.record_is_visible(record):
visible_records.append(record)

return visible_records

def fetch(self, expr):
visible_records = []
for record in self.table.records:
if self.record_is_visible(record) and expr(record):
visible_records.append(record)

return visible_records

def commit(self):
self.table.active_xids.discard(self.xid)

def rollback(self):
for action in reversed(self.rollback_actions):
if action[0] == 'add':
self.table.records[action[1]]['expired_xid'] = 0
elif action[0] == 'delete':
self.table.records[action[1]]['expired_xid'] = self.xid

self.table.active_xids.discard(self.xid)



We also introduce a new type of error, the RollbackException. We will use this to indicate that some race condition has occurred, that if allowed to continue would break the requirements of the chosen isolation level. This type of error becomes more common as the isolation level increases and is caught by the DBMS so that the transaction can be safely rolled back and the client notified.

It is really just an alias for Exception. However, we want to differentiate this type of error so that other errors are not mistakenly caught:

class RollbackException(Exception):
pass



It’s not easy to demonstrate interfaces or abstract classes in Python. Two methods that are missing from the Transaction class above are record_is_locked(record) and record_is_visible(record) and are implemented by the child class. As long as we implement these two methods we can control the isolation behaviour of the transaction.

For example the most simple isolation level is read uncommitted:

class ReadUncommittedTransaction(Transaction):
def record_is_locked(self, record):
return record['expired_xid'] != 0

def record_is_visible(self, record):
return record['expired_xid'] == 0

It may seem like an anti-transaction pattern to read data that is not yet confirmed by another transaction. Read uncommitted is useful for large reporting queries where you really don’t want to impose any locking that would affect in-flight or new transactions. In exchange for this your totals may be a little off. Sometimes this is insignificant enough (such as processing millions of rows) that it makes sense to trade accuracy for concurrency.



Here is the implementation for read committed (if you have read the previous article this should look very familiar):

class ReadCommittedTransaction(Transaction):
def record_is_locked(self, record):
return record['expired_xid'] != 0 and \
row['expired_xid'] in self.table.active_xids

def record_is_visible(self, record):
# The record was created in active transaction that is not our
# own.
if record['created_xid'] in self.table.active_xids and \
record['created_xid'] != self.xid:
return False

# The record is expired or and no transaction holds it that is
# our own.
if record['expired_xid'] != 0 and \
(record['expired_xid'] not in self.table.active_xids or \
record['expired_xid'] == self.xid):
return False

return True

I won’t explain this now as there was already a whole article dedicated to it.



Now we can move onto the higher isolation levels that most databases do not use by default because concurrency and performance start to suffer in exchange for greater accuracy and isolations of the transactions:

class RepeatableReadTransaction(ReadCommittedTransaction):
def record_is_locked(self, record):
return ReadCommittedTransaction.record_is_locked(self, record) or \
self.table.locks.exists(self, record['id'])

def record_is_visible(self, record):
is_visible = ReadCommittedTransaction.record_is_visible(self, record)

if is_visible:
self.table.locks.add(self, record['id'])

return is_visible

To achieve repeatable read we use the same visibility (and locking) checks as read committed (ReadCommittedTransaction), but we add a read lock on every record that is read (and hence, visible) by us. It’s important we don’t add read locks to records that would otherwise not be visible to us, so that another transaction cannot remove it if we need to rescan the data.

Here is a very crude lock manager, it simply keeps track of which transactions hold a read lock on any particular row. How this lock manager performs isn’t important, and there many ways to make this mechanism work better and faster. For the purpose of this demonstration just know that there is something that remembers when transactions have seen a row:

class LockManager:
def __init__(self):
self.locks = []

def add(self, transaction, record_id):
if not self.exists(transaction, record_id):
self.locks.append([transaction, record_id])

def exists(self, transaction, record_id):
return any(lock[0] is transaction and lock[1] == record_id \
for lock in self.locks)



Finally, we can implement serializable:

class SerializableTransaction(RepeatableReadTransaction):
def __init__(self, table, xid):
Transaction.__init__(self, table, xid)
self.existing_xids = self.table.active_xids.copy()

def record_is_visible(self, record):
is_visible = ReadCommittedTransaction.record_is_visible(self, record) \
and record['created_xid'] <= self.xid \
and record['created_xid'] in self.existing_xids

if is_visible:
self.table.locks.add(self, record['id'])

return is_visible

Once again it uses the logic of ReadCommittedTransaction (but not RepeatableReadTransaction as you might originally suspect). Serializable primarily enforces one main new restriction to prevent phantom reads. That is, we must ignore any record that is added or updated in a transaction that was created after the current transaction.

Bring On the Tests

To really demonstrate the isolation levels we need to create test cases. All of the tests start with the same fixture data:

id
name
1
Joe
3
Jill

A transaction test is implemented as a class. It will setup the fixture data (in the table above) and clients on which the test will be performed. It also runs the test and returns a pretty tick or cross for the outcome (we will use this later).

class TransactionTest:
def __init__(self, transaction_type):
self.table = Table()
client = self.table.new_transaction(ReadCommittedTransaction)
client.add_record(id=1, name="Joe")
client.add_record(id=3, name="Jill")
client.commit()

self.client1 = self.table.new_transaction(transaction_type)
self.client2 = self.table.new_transaction(transaction_type)

def run_test(self):
try:
return self.run()
except RollbackException:
return False

def result(self):
if self.run_test():
return u'✔'
return u'✘'

Individual error scenarios (dirty reads, non-repeatable reads and phantom reads) are implemented by extending the TransactionTest and providing the run() method:

class DirtyRead(TransactionTest):
def run(self):
result1 = self.client1.fetch_record(id=1)
self.client2.update_record(id=1, name="Joe 2")
result2 = self.client1.fetch_record(id=1)

return result1 != result2

class NonRepeatableRead(TransactionTest):
def run(self):
result1 = self.client1.fetch_record(id=1)
self.client2.update_record(id=1, name="Joe 2")
self.client2.commit()
result2 = self.client1.fetch_record(id=1)

return result1 != result2

class PhantomRead(TransactionTest):
def run(self):
result1 = len(self.client1.fetch(lambda r: 1 <= r['id'] <= 3))
self.client2.add_record(id=2, name="John")
self.client2.commit()
result2 = self.client1.count_records(min_id=1, max_id=3)

return result1 != result2

Running the complete program prints a table of results. The means that it could replicate that kind of error (a little counterintuitive, I know):

Dirty Repeat Phantom
read uncommitted ✔ ✔ ✔
read committed ✘ ✔ ✔
repeatable read ✘ ✘ ✔
serializable ✘ ✘ ✘


comments powered by Disqus