Dominic Williams

Occasionally useful posts about RIAs, Web scale computing & miscellanea

Archive for the ‘Locking’ Category

Locking and transactions over Cassandra using Cages

with 43 comments

Introduction

Anyone following my occasional posts will know that me and my team are working on a new kids game / social network called http://www.FightMyMonster.com. We are trying to break new ground with this project in many ways, and to support the data intensive nature of what we are trying to do we eventually selected the Cassandra database after working with several others.

This post is about a library we are using with Cassandra called Cages. Using Cages, you can perform much more advanced data manipulation and storage over a Cassandra database. This post explains why and gives some more information.

You can find Cages here http://cages.googlecode.com.

Brief Background

For those that aren’t already familiar with Cassandra (skip this if you are), it can be described as the best representative of a new breed of fast, easily scalable databases. Write operations are evenly spread across a cluster of machines, removing the bottleneck found in traditional SQL database clusters, and it can continue operating even when some nodes are lost or partitioned. The cluster is symmetric in the sense there is no master node, nodes communicate with each other using a P2P protocol and can be easily added and removed by an administrator.

In order to deliver these characteristics,  which are particularly valuable to Web 2.0 enterprises but also will likely prove useful in other industries too, Cassandra offers what is known as a NoSQL model. This model is significantly different to a traditional SQL model, and many coming from more traditional database backgrounds will more easily understand Cassandra as a highly scalable, highly resilient distributed structured storage engine. While NoSQL offers some unique advantages to developers when compared to SQL, it is also the case that whereas in SQL complex operations can be specified in a single statement that is either executed or not (i.e. that have ACID properties), in Cassandra complex operations on data must usually be comprised from several different operations, which can only be made reliable individually.

What Cages is for?

In many cases, websites and systems can be built against Cassandra without regard to ACID issues. Data storage and manipulation can be limited to operations against single rows (and for those that don’t know, rows in NoSQL models are really like multi-level hash tables which can contain hierarchical “ready-joined” data, and generally offer many more possibilities than SQL rows). Where a mutation of these rows must be reliable, or immediately seen by other clients of the database, Cassandra allows the developer to choose from a range of consistency levels that specify the tradeoff between performance, safety of storage and the timeliness with which data becomes visible to all.

This system is undeniably very effective, but when the systems you are building involve complex data structures and manipulation, you can still quickly reach situations where your logical operations necessarily involve several individual Cassandra read and write operations across multiple rows. Cassandra does not get involved in managing the safety and reliability of operations at the higher logical level, which means guaranteeing the logical consistency of your data can require some extra work. Some people, particularly those wedded to SQL databases, advocate storing some parts of your data in traditional SQL databases. For us though, it is most definitely preferable to develop and use Cages!

What is Cages?

Cages is a new Java library that provides distributed synchronization functionality, and soon additional functionality for things like transactions, by using the services of a ZooKeeper server or cluster. ZooKeeper is a very active project and the system is currently widely used. It started life as a Yahoo Research project, see here http://research.yahoo.com/project/1849 and is now an important Apache project, see http://hadoop.apache.org/zookeeper/. Cages has wide application, but its development will be very much driven by needs in relation to Cassandra.

Using Cages for locking

Cages offers three locking classes, ZkReadLock, ZkWriteLock and ZkMultiLock.

Single path locking

The simplest application of Cages can be to enforce correct updates on data values inside Cassandra (or some other NoSQL database). For example, you may have issues with that old chestnut, the Lost Update Problem. This happens where you read the data with one operation, modify the data and then write it back with a second operation. Problems occur when another client performs the same operation simultaneously, such that the last client to write back the modified value will overwrite the modifications made by the other.

Thus in its most simple form, two clients wish to donate some money to a bank balance. Both simultaneously read the same bank balance value B1. The first client adds donation D1, and writes back (B1 + D1). The second client adds donation D2, and writes back (B1 + D2). Unfortunately bank balance B2 = B1 + D2, and donation D1 has been lost.

Cages provides an easy fix:

    void depositMoney(int amount) {
        ZkWriteLock lock = new ZkWriteLock(“/accounts/” + accountId + “/balance”);
        lock.acquire();
        try {
            // 1. Read the balance
            // 2. Update the balance
            // 3. Write the balance back
        } finally {
            lock.release();
        }
    }

Note that the paths passed to the lock classes can represent actual data paths within a NoSQL model, or can simply represent logical control over parts of a wider data model (so long as your application faithfully adheres to the rules you set).

Multi path locking

The Lost Update Problem is the most simple locking scenario where Cages can be applied. In our case, while many parts of our system even use Cassandra without locking, often with low consistency levels for maximum performance, there are several areas where we necessarily perform complex operations over contended data that involve numerous individual read and write operations. To begin with, we decided to treat the cases by nesting the ZkReadLock and ZkWriteLock single path locking primitives. However, there is a problem doing this in a distributed environment.

It is a simple fact that in a distributed environment, many situations where you acquire single path locks in a nested manner can result in deadlock. For example, if one operation sequentially tries to acquire R(P1) then W(P2), and a second operation simultaneously tries to acquire R(P2) then W(P1), deadlock will likely result: the first operation will acquire R(P1) and the second operation will acquire R(P2), but then the first operation will block waiting to acquire W(P2) and the second operation will block waiting to acquire W(P1).

Avoiding these problems with single path locks is no simple matter. For a start, the logical detection of closed wait graphs (deadlock) in the distributed environment is difficult and expensive to perform. The simplest approach to solving the problem is to try to acquire locks with a timeout, such that if you get into a deadlock situation, your acquire() calls throw an exception and you abandon your attempt. The problem here though is that your code has to handle the exception, and possibly rollback parts of the operation performed earlier under the protection of the outer lock.

For all these reasons, when an operation needs to acquire locks over multiple paths in the distributed environment, ZkMultiLock is the class to use.

ZkMultiLock allows you to specify any number of read and write locks over paths, which may then all be acquired “simultaneously”. If your operation can acquire all the locks it needs together at the outset using ZkMultiLock, this avoids any possibility of deadlock. This provides slightly worse performance where multiple paths are specified and locks on the paths are highly contended. But in practice, locks are rarely that highly contended, and you just need to guard against the disaster of simultaneously running operations interfering with each other and corrupting data. Because of the dangers of deadlock, in the Fight My Monster project have mandated that only ZkMultiLock can be used unless there are very special reasons, a situation we have not yet encountered.

    void executeTrade(long lotId, String sellerId, String buyerId) {
        // In the following we need to hold write locks over both the seller and buyer's account balances
        // so they can be checked and updated correctly. We also want a lock over the lot, since the value
        // of lots owned might be used in conjunction with the bank balance by code considering the
        // total worth of the owner. Acquiring the required locks simultaneously using ZkMultiLock avoids
        // the possibility of accidental deadlock occurring between this client code and other client code
        // contending access to the same data / lock paths.
        ZkMultiLock mlock = new ZkMultiLock();
        mlock.addWriteLock("/Bank/accounts/" + sellerId);
        mlock.addWriteLock("/Bank/accounts/" + buyerId);
        mlock.addWriteLock("/Warehouse/" + lotId);
        mlock.acquire();
        try {
            // 1. check buyer has sufficient funds
            // 2. debit buyer's account
            // 3. credit seller's account
            // 4. change ownership of goods
        } finally {
             mlock.release();
        }
    }

Transactions for Cassandra

Transactions are a planned feature at the time of writing, 12/5/2010. It should be too long before they make it into the library, so I will explain a bit about them here.

Locking allows you to synchronize sequences of read and write (mutation) operations across rows stored on your Cassandra cluster. However, the locking classes do not solve the problem that occurs when part way through a complex operation your client machine expires, leaving the data inside the distributed database in a logically inconsistent state. For many applications the likelihood of this occurring is low enough for the locking classes alone to be sufficient. But there may be a small number of operations within applications for which data simply must be logically consistent, and even a very rare failure is unacceptable. This is where transactions come in.

For those that are interested, the following explains how they will work.

A new ZkTransaction class will provide the functionality, and it will need to be used in conjunction with the ZkMultiLock class. ZkTransaction will provide a simplified version of the Cassandra Thrift API that allows a series of data mutation operations to be specified. Client operations wil will proceed by first specifying the necessary locks that must be held, and then specifying the set of data mutations that must be performed by the transaction. When the transaction has been specified, it’s commit() method must be called passing the ZkMultiLock instance as a parameter.

At this point, internally Cages will add a reference to a transaction node created on ZooKeeper from each single path lock node held. The ZkTransaction instance reads from Cassandra the current values of the data it is required to modify, and writes it into the transaction node as a “before” state. Once this is done, it sets about applying the data mutations specified in the necessary sequence of individual Cassandra read and write (mutate) operations. Once all operations are performed, the references to the transaction node from within the locks are removed, and then finally the transaction node itself is deleted – the transaction has now been committed, and the developer can release() the ZkMultiLock.

ZkTransaction can provide a guarantee of consistency for Cages clients because if during the execution of the sequence of individual Cassandra mutation operations the client machine suddenly dies, Cages will immediately revoke the locks the client holds. From this point any instances of ZkReadLock, ZkWriteLock or ZkMultiLock wishing to acquire the released paths must first rollback the transaction node by returning the relevant data to its original “before” state specified. The key point is that any processes that need to see the data in a logically consistent state, and therefore always acquire locks referencing the data in question before accessing it, will always see it as such. This provides a form of ACID for complex operations against a Cassandra database.

    void executeTrade(long lotId, String sellerId, String buyerId) {
        ZkMultiLock mlock = new ZkMultiLock();
        mlock.addWriteLock("/Bank/accounts/" + sellerId);
        mlock.addWriteLock("/Bank/accounts/" + buyerId);
        mlock.addWriteLock("/Warehouse/" + lotId);
        mlock.acquire();
        try {
            // 1. check that buyer has sufficient funds
            // ....

            // 2. perform mutations using transaction object
            ZkTransaction transaction = new ZkTransaction(NoSQL.Cassandra);
            transaction.begin(mlock);
            try {
                // 2. debit buyer's account
                transaction.insert(buyerId, "accounts", bytes("balance"), bytes(newBalance));
                // 3. credit seller's account
                // ...
                // 4. change ownership of goods
                // ...
            } finally {
                transaction.commit();
            }
        } finally {
             mlock.release();
        }
    }

Scalability and hashing ZooKeeper clusters

It is worth saying first off that a three node ZooKeeper cluster using powerful machines should be able to handle a considerable workload, and that where usage of locking and transactions is limited on an as-needed basis, such a setup will be able to provide for the needs of many Internet scale applications. However, it is easy to conceive of Cassandra being applied more widely outside of typical Web 2.0 norms where usage of locking and transactions is much heavier, and therefore the scalability of ZooKeeper must be examined.

The main issue is that for the purposes described it is not desirable to scale ZooKeeper clusters beyond three nodes. The reason for this is that while adding nodes scales up read performance, write performance actually starts degrading because of the need to synchronize write operations across all members, and therefore clustering really offers availability rather than performance. A good overview of the actual performance parameters can be found here http://hadoop.apache.org/zookeeper/docs/r3.3.0/zookeeperOver.html. The question then, is what to do where ZooKeeper becomes a bottleneck.

The solution we suggest is simply to run more than one ZooKeeper cluster for the purposes of locking and transactions, and simply to hash locks and transactions onto particular clusters. This will be the final feature added to Cages.

Note: since I wrote the above Eric Hauser kindly drew my attention to the new “Observers” feature in ZooKeeper 3.3. This may greatly raise the limit at which hashing to separate 3 node clusters becomes necessary. I am hoping to collate performance information and tests in the near future so people have more of an idea what to expect. See http://hadoop.apache.org/zookeeper/docs/r3.3.0/zookeeperObservers.html

That’s it. Hope it was interesting. Please bear with me as Cages develops further over the coming weeks and feel free to test and report.

Final note

Check out the comments too because there are already useful several clarifications and expansions there.

Written by dominicwilliams

May 12, 2010 at 10:10 pm