Dominic Williams

Occasionally useful posts about RIAs, Web scale computing & miscellanea

Posts Tagged ‘Cassandra

10 steps to upgrade a Cassandra node

with 2 comments

The following steps might prove interesting to those with a new Cassandra cluster who are wondering how to upgrade, or to those investigating Cassandra who are interested in what an online upgrade looks like. The good news for those in production, is that if your cluster has a replication factor (RF) of at least 3, read and write operations (with the exception of rarely used ConsistencyLevel.All operations) will be able to continue uninterrupted throughout the upgrade process i.e. you don’t need to put up a “down for maintenance” sign!!

This process should be performed on every node in the cluster in sequence to effect a rolling upgrade. Once you are comfortable with the process, and the number of nodes in your cluster starts to climb, you will want to script much of it.

1. Open a ssh window to the Cassandra node.
$ ssh root@node1.cassandra.my.org

2. Visit http://cassandra.apache.org. The homepage has a link to the latest tar.gz package download. Copy the url of that link

3. Change folder to /opt and then download the upgrade package.
$ cd /opt
$ wget http://www.apache.org…cassandra/0.7.4/apache-cassandra-0.7.4-bin.tar.gz

4. Unpack the package and enter its configuration folder
$ tar xvf apache-cassandra-0.7.4-bin.tar.gz
$ cd /opt/apache-cassandra-0.7.4/conf

5. Backup the default configuration (sometimes the format changes). Copy in the existing configuration.
$ mv cassandra.yaml cassandra.yaml.bak
$ cp /opt/cassandra/conf/cassandra.yaml .

6. Edit the environment configuration script. This often changes so don’t copy in the existing script. You need to configure heap size to be ~50% RAM on server and new heap size to be 100MB * no. CPUs
$ vim cassandra.yaml
…then, for example for server with 6GB RAM and 6CPUs
MAX_HEAP_SIZE=”3G”
HEAP_NEWSIZE=”600M”

7. Create a link to jna.jar from Cassandra’s lib folder (JNA or “Java Native Access” enables Cassandra to directly access native libraries. Without this capability, certain operations such as snapshotting using nodetool are more likely to fail with OOM).
$ cd /opt/apache-cassandra-0.7.4/lib
$ ln -s /usr/share/java/jna.jar .

8. Shutdown the Cassandra software running on the current node. This will not interrupt the cluster in any way.
$ pkill -f ‘java.*cassandra’

9. Change to the “opt” folder and change the soft link the Cassandra distribution
$ cd /opt
$ rm cassandra
$ ln -s /opt/apache-cassandra-0.7.4 cassandra

10. Restart Cassandra
$ cd /opt/cassandra/bin
$ ./cassandra

After restarting Cassandra, you should watch the startup messages to ensure the node comes up without difficulty.

Written by dominicwilliams

March 26, 2011 at 9:23 pm

Cassandra: Up and running quickly in Java using Pelops

with 52 comments

Pelops

In Greek mythology Cassandra is captured by the triumphant king Agamemnon after the fall of Troy, with whom she has two sons, Pelops and Teledamus. This Java client library is Pelop’s namesake nicknamed “Cassandra’s beautiful son” because it offers a beautiful way to code against the Cassandra database. This is a quick introduction to the library.

You can find the open source code here http://pelops.googlecode.com/

Objectives

Pelops was born to improve the quality of Cassandra code across a complex commercial project that makes extensive use of the database. The main objectives the library are:

  • To faithfully expose Cassandra’s API in a manner that is immediately understandable to anyone:
    simple, but beautiful
  • To completely separate low-level concerns such as connection pooling from data processing code
  • To eliminate “dressing code”, so that the semantics of data processing stand clear and obvious
  • To accelerate development through intellisense, function overloading and powerful high-level methods
  • To implement strategies like load balancing based upon the per node running operation count
  • To include robust error handling and recovery that does not mask application-level logic problems
  • To track the latest Cassandra releases and features without causing breaking changes
  • To define a long-lasting paradigm for those writing client code

Up and running in 5 minutes

To start working with Pelops and Cassandra, you need to know three things:

  1. How to create a connection pool, typically once at startup
  2. How to write data using the Mutator class
  3. How to read data using the Selector class.

It’s that easy!

Creating a connection pool

To work with a Cassandra cluster, you need to start off by defining a connection pool. This is typically done once in the startup code of your application. Sometimes you will define more than one connection pool. For example, in our project, we use two Cassandra database clusters, one which uses random partitioning for data storage, and one which uses order preserving partitioning for indexes. You can create as many connection pools as you need.

To create a pool, you need to specify a name, a list of known contact nodes (the library can automatically detect further nodes in the cluster, but see notes at the end), the network port that the nodes are listening on, and a policy which controls things like the number of connections in your pool.

Here a pool is created with default policies:

Pelops.addPool(
    "Main",
    new String[] { "cass1.database.com", "cass2.database.com", "cass3.database.com"},
    9160,
    new Policy());

Using a Mutator

The Mutator class is used to make mutations to a keyspace (which in SQL speak translates as making changes to a database). You ask Pelops for a new mutator, and then specify the mutations you wish to make. These are sent to Cassandra in a single batch when you call its execute method.

To create a mutator, you must specify the name of the connection pool you will use and the name of the keyspace you wish to mutate. Note that the pool determines what database cluster you are talking to.

Mutator mutator = Pelops.createMutator("Main", "SupportTickets");

Once you have the mutator, you start specifying changes.

/**
 * Write multiple sub-column values to a super column...
 * @param rowKey                    The key of the row to modify
 * @param colFamily                 The name of the super column family to operate on
 * @param colName                   The name of the super column
 * @param subColumns                A list of the sub-columns to write
 */
mutator. writeSubColumns(
    userId,
    "L1Tickets",
    UuidHelper.newTimeUuidBytes(), // using a UUID value that sorts by time
    mutator.newColumnList(
        mutator.newColumn("category", "videoPhone"),
        mutator.newColumn("reportType", "POOR_PICTURE"),
        mutator.newColumn("createdDate", NumberHelper.toBytes(System.currentTimeMillis())),
        mutator.newColumn("capture", jpegBytes),
        mutator.newColumn("comment") ));

/**
 * Delete a list of columns or super columns...
 * @param rowKey                    The key of the row to modify
 * @param colFamily                 The name of the column family to operate on
 * @param colNames                  The column and/or super column names to delete
 */
mutator.deleteColumns(
    userId,
    "L1Tickets",
    resolvedList);

After specifying the changes, you send them to Cassandra in a single batch by calling execute. This takes the Cassandra consistency level as a parameter.

mutator.execute(ConsistencyLevel.ONE);

Note that if you need to know a particular mutation operation has completed successfully before initiating some subsequent operation, then you should not batch your mutations together. Since you cannot re-use a mutator after it has been executed, you should create two or more mutators, and execute them with at least a QUORUM consistency level.

Browse the Mutator class to see the methods and overloads that are available
here

Using a Selector

The Selector class is used to read data from a keyspace. You ask Pelops for a new selector, and then read data by calling its methods.

Selector selector = Pelops.createSelector("Main", "SupportTickets");

Once you have a selector instance, you can start reading data using its many overloads.

/**
 * Retrieve a super column from a row...
 * @param rowKey                        The key of the row
 * @param columnFamily                  The name of the column family containing the super column
 * @param superColName                  The name of the super column to retrieve
 * @param cLevel                        The Cassandra consistency level with which to perform the operation
 * @return                              The requested SuperColumn
 */
SuperColumn ticket = selector.getSuperColumnFromRow(
    userId,
    "L1Tickets",
    ticketId,
    ConsistencyLevel.ONE);

assert ticketId.equals(ticket.name)

// enumerate sub-columns
for (Column data : ticket.columns) {
    String name = data.name;
    byte[] value = data.value;
}

/**
 * Retrieve super columns from a row
 * @param rowKey                        The key of the row
 * @param columnFamily                  The name of the column family containing the super columns
 * @param colPredicate                  The super column selector predicate
 * @param cLevel                        The Cassandra consistency level with which to perform the operation
 * @return                              A list of matching columns
 */
List<SuperColumn> allTickets = selector.getSuperColumnsFromRow(
    userId,
    "L1Tickets",
    Selector.newColumnsPredicateAll(true, 10000),
    ConsistencyLevel.ONE);

/**
 * Retrieve super columns from a set of rows.
 * @param rowKeys                        The keys of the rows
 * @param columnFamily                   The name of the column family containing the super columns
 * @param colPredicate                   The super column selector predicate
 * @param cLevel                         The Cassandra consistency level with which to perform the operation
 * @return                               A map from row keys to the matching lists of super columns
 */
Map<String, List<SuperColumn>> allTicketsForFriends = selector.getSuperColumnsFromRows(
    Arrays.asList(new String[] { "matt", "james", "dom" }, // the friends
    "L1Tickets",
    Selector.newColumnsPredicateAll(true, 10000),
    ConsistencyLevel.ONE);

/**
 * Retrieve a page of super columns composed from a segment of the sequence of super columns in a row.
 * @param rowKey                        The key of the row
 * @param columnFamily                  The name of the column family containing the super columns
 * @param startBeyondName               The sequence of super columns must begin with the smallest super column name greater than this value. Pass null to start at the beginning of the sequence.
 * @param orderType                     The scheme used to determine how the column names are ordered
 * @param reversed                      Whether the scan should proceed in descending super column name order
 * @param count                         The maximum number of super columns that can be retrieved by the scan
 * @param cLevel                        The Cassandra consistency level with which to perform the operation
 * @return                              A page of super columns
 */
List<SuperColumn> pageTickets = getPageOfSuperColumnsFromRow(
    userId,
    "L1Tickets",
    lastIdOfPrevPage, // null for first page
    Selector.OrderType.TimeUUIDType, // ordering defined in this super column family
    true, // blog order
    10, // count shown per page
    ConsistencyLevel.ONE);

There are a huge number of selector methods and overloads which expose the full power of Cassandra, and others like the paginator methods that make otherwise complex tasks simple. Browse the Selector class to see what is available here

Other stuff

All the main things you need to start using Pelops have been covered, and with your current knowledge you can easily feel your way around Pelops inside your IDE using intellisense. Some final points it will be useful to keep in mind if you want to work with Pelops:

  • If you need to perform deletions at the row key level, use an instance of the KeyDeletor class (call Pelops.createKeyDeletor).
  • If you need metrics from a Cassandra cluster, use an instance of the Metrics class (call Pelops.createMetrics).
  • To work with Time UUIDs, which are globally unique identifiers that can be sorted by time – which you will find to be very useful throughout your Cassandra code – use the UuidHelper class.
  • To work with numbers stored as binary values, use the NumberHelper class.
  • To work with strings stored as binary values, use the StringHelper class.
  • Methods in the Pelops library that cause interaction with Cassandra throw the standard
    Cassandra exceptions defined here.

The Pelops design secret

One of the key design decisions that at the time of writing distinguishes Pelops, is that the data processing code written by developers does not involve connection pooling or management. Instead, classes like Mutator and Selector borrow connections to Cassandra from a Pelops pool for just the periods that they need to read and write to the underlying Thrift API. This has two advantages.

Firstly, obviously, code becomes cleaner and developers are freed from connection management concerns. But also more subtly this enables the Pelops library to completely manage connection pooling itself, and for example keep track of how many outstanding operations are currently running against each cluster node.

This for example, enables Pelops to perform more effective client load balancing by ensuring that new operations are performed against the node to which it currently has the least outstanding operations running. Because of this architectural choice, it will even be possible to offer strategies in the future where for example nodes are actually queried to determine their load.

To see how the library abstracts connection pooling away from the semantics of data processing, take a look at the execute method of Mutator and the tryOperation method of Operand. This is the foundation upon which Pelops greatly improves over existing libraries that have modelled connection management on pre-existing SQL database client libraries.

*–

That’s all. I hope you get the same benefits from Pelops that we did.

Written by dominicwilliams

June 11, 2010 at 12:31 pm

Locking and transactions over Cassandra using Cages

with 43 comments

Introduction

Anyone following my occasional posts will know that me and my team are working on a new kids game / social network called http://www.FightMyMonster.com. We are trying to break new ground with this project in many ways, and to support the data intensive nature of what we are trying to do we eventually selected the Cassandra database after working with several others.

This post is about a library we are using with Cassandra called Cages. Using Cages, you can perform much more advanced data manipulation and storage over a Cassandra database. This post explains why and gives some more information.

You can find Cages here http://cages.googlecode.com.

Brief Background

For those that aren’t already familiar with Cassandra (skip this if you are), it can be described as the best representative of a new breed of fast, easily scalable databases. Write operations are evenly spread across a cluster of machines, removing the bottleneck found in traditional SQL database clusters, and it can continue operating even when some nodes are lost or partitioned. The cluster is symmetric in the sense there is no master node, nodes communicate with each other using a P2P protocol and can be easily added and removed by an administrator.

In order to deliver these characteristics,  which are particularly valuable to Web 2.0 enterprises but also will likely prove useful in other industries too, Cassandra offers what is known as a NoSQL model. This model is significantly different to a traditional SQL model, and many coming from more traditional database backgrounds will more easily understand Cassandra as a highly scalable, highly resilient distributed structured storage engine. While NoSQL offers some unique advantages to developers when compared to SQL, it is also the case that whereas in SQL complex operations can be specified in a single statement that is either executed or not (i.e. that have ACID properties), in Cassandra complex operations on data must usually be comprised from several different operations, which can only be made reliable individually.

What Cages is for?

In many cases, websites and systems can be built against Cassandra without regard to ACID issues. Data storage and manipulation can be limited to operations against single rows (and for those that don’t know, rows in NoSQL models are really like multi-level hash tables which can contain hierarchical “ready-joined” data, and generally offer many more possibilities than SQL rows). Where a mutation of these rows must be reliable, or immediately seen by other clients of the database, Cassandra allows the developer to choose from a range of consistency levels that specify the tradeoff between performance, safety of storage and the timeliness with which data becomes visible to all.

This system is undeniably very effective, but when the systems you are building involve complex data structures and manipulation, you can still quickly reach situations where your logical operations necessarily involve several individual Cassandra read and write operations across multiple rows. Cassandra does not get involved in managing the safety and reliability of operations at the higher logical level, which means guaranteeing the logical consistency of your data can require some extra work. Some people, particularly those wedded to SQL databases, advocate storing some parts of your data in traditional SQL databases. For us though, it is most definitely preferable to develop and use Cages!

What is Cages?

Cages is a new Java library that provides distributed synchronization functionality, and soon additional functionality for things like transactions, by using the services of a ZooKeeper server or cluster. ZooKeeper is a very active project and the system is currently widely used. It started life as a Yahoo Research project, see here http://research.yahoo.com/project/1849 and is now an important Apache project, see http://hadoop.apache.org/zookeeper/. Cages has wide application, but its development will be very much driven by needs in relation to Cassandra.

Using Cages for locking

Cages offers three locking classes, ZkReadLock, ZkWriteLock and ZkMultiLock.

Single path locking

The simplest application of Cages can be to enforce correct updates on data values inside Cassandra (or some other NoSQL database). For example, you may have issues with that old chestnut, the Lost Update Problem. This happens where you read the data with one operation, modify the data and then write it back with a second operation. Problems occur when another client performs the same operation simultaneously, such that the last client to write back the modified value will overwrite the modifications made by the other.

Thus in its most simple form, two clients wish to donate some money to a bank balance. Both simultaneously read the same bank balance value B1. The first client adds donation D1, and writes back (B1 + D1). The second client adds donation D2, and writes back (B1 + D2). Unfortunately bank balance B2 = B1 + D2, and donation D1 has been lost.

Cages provides an easy fix:

    void depositMoney(int amount) {
        ZkWriteLock lock = new ZkWriteLock(“/accounts/” + accountId + “/balance”);
        lock.acquire();
        try {
            // 1. Read the balance
            // 2. Update the balance
            // 3. Write the balance back
        } finally {
            lock.release();
        }
    }

Note that the paths passed to the lock classes can represent actual data paths within a NoSQL model, or can simply represent logical control over parts of a wider data model (so long as your application faithfully adheres to the rules you set).

Multi path locking

The Lost Update Problem is the most simple locking scenario where Cages can be applied. In our case, while many parts of our system even use Cassandra without locking, often with low consistency levels for maximum performance, there are several areas where we necessarily perform complex operations over contended data that involve numerous individual read and write operations. To begin with, we decided to treat the cases by nesting the ZkReadLock and ZkWriteLock single path locking primitives. However, there is a problem doing this in a distributed environment.

It is a simple fact that in a distributed environment, many situations where you acquire single path locks in a nested manner can result in deadlock. For example, if one operation sequentially tries to acquire R(P1) then W(P2), and a second operation simultaneously tries to acquire R(P2) then W(P1), deadlock will likely result: the first operation will acquire R(P1) and the second operation will acquire R(P2), but then the first operation will block waiting to acquire W(P2) and the second operation will block waiting to acquire W(P1).

Avoiding these problems with single path locks is no simple matter. For a start, the logical detection of closed wait graphs (deadlock) in the distributed environment is difficult and expensive to perform. The simplest approach to solving the problem is to try to acquire locks with a timeout, such that if you get into a deadlock situation, your acquire() calls throw an exception and you abandon your attempt. The problem here though is that your code has to handle the exception, and possibly rollback parts of the operation performed earlier under the protection of the outer lock.

For all these reasons, when an operation needs to acquire locks over multiple paths in the distributed environment, ZkMultiLock is the class to use.

ZkMultiLock allows you to specify any number of read and write locks over paths, which may then all be acquired “simultaneously”. If your operation can acquire all the locks it needs together at the outset using ZkMultiLock, this avoids any possibility of deadlock. This provides slightly worse performance where multiple paths are specified and locks on the paths are highly contended. But in practice, locks are rarely that highly contended, and you just need to guard against the disaster of simultaneously running operations interfering with each other and corrupting data. Because of the dangers of deadlock, in the Fight My Monster project have mandated that only ZkMultiLock can be used unless there are very special reasons, a situation we have not yet encountered.

    void executeTrade(long lotId, String sellerId, String buyerId) {
        // In the following we need to hold write locks over both the seller and buyer's account balances
        // so they can be checked and updated correctly. We also want a lock over the lot, since the value
        // of lots owned might be used in conjunction with the bank balance by code considering the
        // total worth of the owner. Acquiring the required locks simultaneously using ZkMultiLock avoids
        // the possibility of accidental deadlock occurring between this client code and other client code
        // contending access to the same data / lock paths.
        ZkMultiLock mlock = new ZkMultiLock();
        mlock.addWriteLock("/Bank/accounts/" + sellerId);
        mlock.addWriteLock("/Bank/accounts/" + buyerId);
        mlock.addWriteLock("/Warehouse/" + lotId);
        mlock.acquire();
        try {
            // 1. check buyer has sufficient funds
            // 2. debit buyer's account
            // 3. credit seller's account
            // 4. change ownership of goods
        } finally {
             mlock.release();
        }
    }

Transactions for Cassandra

Transactions are a planned feature at the time of writing, 12/5/2010. It should be too long before they make it into the library, so I will explain a bit about them here.

Locking allows you to synchronize sequences of read and write (mutation) operations across rows stored on your Cassandra cluster. However, the locking classes do not solve the problem that occurs when part way through a complex operation your client machine expires, leaving the data inside the distributed database in a logically inconsistent state. For many applications the likelihood of this occurring is low enough for the locking classes alone to be sufficient. But there may be a small number of operations within applications for which data simply must be logically consistent, and even a very rare failure is unacceptable. This is where transactions come in.

For those that are interested, the following explains how they will work.

A new ZkTransaction class will provide the functionality, and it will need to be used in conjunction with the ZkMultiLock class. ZkTransaction will provide a simplified version of the Cassandra Thrift API that allows a series of data mutation operations to be specified. Client operations wil will proceed by first specifying the necessary locks that must be held, and then specifying the set of data mutations that must be performed by the transaction. When the transaction has been specified, it’s commit() method must be called passing the ZkMultiLock instance as a parameter.

At this point, internally Cages will add a reference to a transaction node created on ZooKeeper from each single path lock node held. The ZkTransaction instance reads from Cassandra the current values of the data it is required to modify, and writes it into the transaction node as a “before” state. Once this is done, it sets about applying the data mutations specified in the necessary sequence of individual Cassandra read and write (mutate) operations. Once all operations are performed, the references to the transaction node from within the locks are removed, and then finally the transaction node itself is deleted – the transaction has now been committed, and the developer can release() the ZkMultiLock.

ZkTransaction can provide a guarantee of consistency for Cages clients because if during the execution of the sequence of individual Cassandra mutation operations the client machine suddenly dies, Cages will immediately revoke the locks the client holds. From this point any instances of ZkReadLock, ZkWriteLock or ZkMultiLock wishing to acquire the released paths must first rollback the transaction node by returning the relevant data to its original “before” state specified. The key point is that any processes that need to see the data in a logically consistent state, and therefore always acquire locks referencing the data in question before accessing it, will always see it as such. This provides a form of ACID for complex operations against a Cassandra database.

    void executeTrade(long lotId, String sellerId, String buyerId) {
        ZkMultiLock mlock = new ZkMultiLock();
        mlock.addWriteLock("/Bank/accounts/" + sellerId);
        mlock.addWriteLock("/Bank/accounts/" + buyerId);
        mlock.addWriteLock("/Warehouse/" + lotId);
        mlock.acquire();
        try {
            // 1. check that buyer has sufficient funds
            // ....

            // 2. perform mutations using transaction object
            ZkTransaction transaction = new ZkTransaction(NoSQL.Cassandra);
            transaction.begin(mlock);
            try {
                // 2. debit buyer's account
                transaction.insert(buyerId, "accounts", bytes("balance"), bytes(newBalance));
                // 3. credit seller's account
                // ...
                // 4. change ownership of goods
                // ...
            } finally {
                transaction.commit();
            }
        } finally {
             mlock.release();
        }
    }

Scalability and hashing ZooKeeper clusters

It is worth saying first off that a three node ZooKeeper cluster using powerful machines should be able to handle a considerable workload, and that where usage of locking and transactions is limited on an as-needed basis, such a setup will be able to provide for the needs of many Internet scale applications. However, it is easy to conceive of Cassandra being applied more widely outside of typical Web 2.0 norms where usage of locking and transactions is much heavier, and therefore the scalability of ZooKeeper must be examined.

The main issue is that for the purposes described it is not desirable to scale ZooKeeper clusters beyond three nodes. The reason for this is that while adding nodes scales up read performance, write performance actually starts degrading because of the need to synchronize write operations across all members, and therefore clustering really offers availability rather than performance. A good overview of the actual performance parameters can be found here http://hadoop.apache.org/zookeeper/docs/r3.3.0/zookeeperOver.html. The question then, is what to do where ZooKeeper becomes a bottleneck.

The solution we suggest is simply to run more than one ZooKeeper cluster for the purposes of locking and transactions, and simply to hash locks and transactions onto particular clusters. This will be the final feature added to Cages.

Note: since I wrote the above Eric Hauser kindly drew my attention to the new “Observers” feature in ZooKeeper 3.3. This may greatly raise the limit at which hashing to separate 3 node clusters becomes necessary. I am hoping to collate performance information and tests in the near future so people have more of an idea what to expect. See http://hadoop.apache.org/zookeeper/docs/r3.3.0/zookeeperObservers.html

That’s it. Hope it was interesting. Please bear with me as Cages develops further over the coming weeks and feel free to test and report.

Final note

Check out the comments too because there are already useful several clarifications and expansions there.

Written by dominicwilliams

May 12, 2010 at 10:10 pm

HBase vs Cassandra: why we moved

with 94 comments

My team is currently working on a brand new product – the forthcoming MMO www.FightMyMonster.com. This has given us the luxury of building against a NOSQL database, which means we can put the horrors of MySQL sharding and expensive scalability behind us. Recently a few people have been asking why we seem to have changed our preference from HBase to Cassandra. I can confirm the change is true and that we have in fact almost completed porting our code to Cassandra, and here I will seek to provide an explanation.

For those that are new to NOSQL, in a following post I will write about why I think we will see a seismic shift from SQL to NOSQL over the coming years, which will be just as important as the move to cloud computing. That post will also seek to explain why I think NOSQL might be the right choice for your company. But for now I will simply relay the reasons why we have chosen Cassandra as our NOSQL solution.

Caveat Emptor – if you’re looking for a shortcut to engaging your neurons be aware this isn’t an exhaustive critical comparison, it just summarizes the logic of just another startup in a hurry with limited time and resources!!

Did Cassandra’s bloodline foretell the future?

One of my favourite tuppences for engineers struggling to find a bug is “breadth first not depth first”. This can be annoying for someone working through complex technical details, because it implies that the solution is actually much simpler if they only looked (advice: only use this saying with established colleagues who will forgive you). I coined this saying because in software matters I find that if we force ourselves to examine the top level considerations first, before tunnelling down into the detail of a particular line of enquiry, we can save enormous time.

So before getting technical, I’ll mention I might have heeded my motto better when we were making our initial choice between HBase and Cassandra. The technical conclusions behind our eventual switch might have been predicted: HBase and Cassandra have dramatically different bloodlines and genes, and I think this influenced their applicability within our business.

Loosely speaking, HBase and its required supporting systems are derived from what is known of the original Google BigTable and Google File System designs (as known from the Google File System paper Google published in 2003, and the BigTable paper published in 2006). Cassandra on the other hand is a recent open source fork of a standalone database system initially coded by Facebook, which while implementing the BigTable data model, uses a system inspired by Amazon’s Dynamo for storing data (in fact much of the initial development work on Cassandra was performed by two Dynamo engineers recruited to Facebook from Amazon).

In my opinion, these differing histories have resulted in HBase being more suitable for data warehousing, and large scale data processing and analysis (for example, such as that involved when indexing the Web) and Cassandra being more suitable for real time transaction processing and the serving of interactive data. Writing a proper study of that hypothesis is well beyond this post, but I believe you will be able to detect this theme recurring when considering the databases.

NOTE: if you are looking for lightweight validation you’ll find the current makeup of the key committers interesting: the primary committers to HBase work for Bing (M$ bought their search company last year, and gave them permission to continue submitting open source code after a couple of months). By contrast the primary committers on Cassandra work for Rackspace, which supports the idea of an advanced general purpose NOSQL solution being freely available to counter the threat of companies becoming locked in to the proprietary NOSQL solutions offered by the likes of Google, Yahoo and Amazon EC2.

Malcolm Gladwell would say my unconscious brain would have known immediately that my business would eventually prefer Cassandra based upon these differing backgrounds. It is horses for courses. But of course, justifying a business decision made in the blink of an eye is difficult…

Which NOSQL database has the most momentum?

Another consideration that has persuaded us to move to Cassandra is a belief that it is now has the most general momentum in our community. As you know, in the business of software platforms the bigger you get the bigger you get – where platforms are perceived as similar, people tend to aggregate around the platform that is going to offer the best supporting ecosystem in the long term (i.e. where the most supporting software is available from the community, and where the most developers are available for hire). This effect is self-reinforcing.

When starting with HBase, my impression then was that it had the greatest community momentum behind it, but I now believe that Cassandra is coming through much stronger. The original impression was partly created by two very persuasive and excellently delivered presentations given by the CTOs of StumpleUpon and Streamy, two big players in the Web industry who committed to HBase some time before Cassandra was really an option, and also from a quick reading of an article entitled “HBase vs Cassandra: NoSQL Battle!” (much of which has now been widely debunked).

Proving momentum comprehensively is difficult to do, and you will have to poke about for yourself, but one simple pointer I offer you is the developer activity on IRC. If you connect to freenode.org and compare the #hbase and #cassandra developer channels, you will find Cassandra typically has twice the number of developers online at any time.

If you consider Cassandra has been around for half as long as HBase, you can see why this is quite a clear indication of the accelerating momentum behind Cassandra. You might also take note of the big names coming on board, such as Twitter, where they plan broad usage (see here).

Note: Cassandra’s supporting website looks much lovelier than HBase’s, but seriously, this could be a trend driven by more than the marketing. Read on!

Deep down and technical: CAP and the myth of CA vs AP

There is a very powerful theorem that applies to the development of distributed systems (and here we are talking about distributed databases, as I’m sure you’ve noticed). This is known as the CAP Theorem, and was developed by Professor Eric Brewer, Co-founder and Chief Scientist of Inktomi.

The theorem states, that a distributed (or “shared data”) system design, can offer at most two out of three desirable properties – Consistency, Availability and tolerance to network Partitions. Very basically, “consistency” means that if someone writes a value to a database, thereafter other users will immediately be able to read the same value back, “availability” means that if some number of nodes fail in your cluster the distributed system can remain operational, and “tolerance to partitions” means that if the nodes in your cluster are divided into two groups that can no longer communicate by a network failure, again the system remains operational.

Professor Brewer is an eminent man and many developers, including many in the HBase community, have taken it to heart that their systems can only support two of these properties and have accordingly worked to this design principle. Indeed, if you search online posts related to HBase and Cassandra comparisons, you will regularly find the HBase community explaining that they have chosen CP, while Cassandra has chosen AP – no doubt mindful of the fact that most developers need consistency (the C) at some level.

However I need to draw to your attention to the fact that these claims are based on a complete non sequitur. The CAP theorem only applies to a single distributed algorithm (and here I hope Professor Brewer would agree). But there is no reason why you cannot design a single system where for any given operation, the underlying algorithm and thus the trade-off achieved is selectable. Thus while it is true that a system may only offer two of these properties per operation, what has been widely missed is that a system can be designed that allows a caller to choose which properties they want when any given operation is performed. Not only that, reality is not nearly so black and white, and it is possible to offer differing degrees of balance between consistency, availability and tolerance to partition. This is Cassandra.

This is such an important point I will reiterate: the beauty of Cassandra is that you can choose the trade-offs you want on a case by case basis such that they best match the requirements of the particular operation you are performing. Cassandra proves you can go beyond the popular interpretation of the CAP Theorem and the world keeps on spinning!

For example, let’s look at two different extremes. Let us say that I must read a value from the database with very high consistency – that is, where I will be 100% sure to receive the last copy of that data which was previously written. In this case, I can read the value from Cassandra specifying consistency level “ALL”, which requires that all the nodes that hold replicated copies of that data agree on its value. In this case, I have zero tolerance to either node failure, or network partition. At the other extreme, if I do not care about consistency particularly, and simply want the maximum possible performance, I can read the value from Cassandra using consistency level “ONE”. In this case, a copy is simply taken from a random node amongst those holding the replicas – and in this case, if the data is replicated three times, it does not matter if either of the two other nodes holding copies have failed or been partitioned from us, although now of course it is also possible that such conditions may mean the data I read is stale.

And better still, you are not forced to live in a black and white world. For example, in our particular application important read/write operations typically use consistency level “QUORUM”, which basically means – and I simplify so please research before writing your Cassandra app – that a majority of nodes in the replication factor agree. From our perspective, this provides both a reasonable degree of resilience to node failure and network partition, while still delivering an extremely high level of consistency. In the general case, we typically use the aforementioned consistency level of “ONE”, which provides maximum performance. Nice!

For us this is a very big plus for Cassandra. Not only can we now easily tune our system, we can also design it so that, for example, when a certain number of nodes fail, or the network connecting those nodes falters, our service continues operating in many respects, and only those aspects that require data consistency fail. HBase is not nearly so flexible, and the pursuit of a single approach within the system (CP) reminds me of the wall that exists between SQL developers and the query optimizer – something it is good to get beyond!

In our project then, Cassandra has proven by far the most flexible system, although you may find your brain at first loses consistency when considering your QUORUMs.

When is monolithic better than modular?

An important distinction between Cassandra and HBase, is that while Cassandra comes as a single Java process to be run per node, a complete HBase solution is really comprised of several parts: you have the database process itself, which may run in several modes, a properly configured and operational hadoop HDFS distributed file system setup, and a Zookeeper system to coordinate the different HBase processes. Does this mean then that this is a modularity win for HBase?

Although it is true that such a setup might promise to leverage the collective benefits of different development teams, in terms of systems administration the modularity of HBase cannot be considered a plus. In fact, especially for a smaller startup company, the modularity of HBase might be a big negative. Let me explain…

The underpinnings of HBase are pretty complex, and anyone in doubt of this should read the original Google File System and BigTable papers. Even setting up HBase in pseudo distributed mode on a single server is difficult – so difficult in fact that I did my best to write a guide that takes you past all the various gotchas in the minimum time (see https://ria101.wordpress.com/2010/01/28/setup-hbase-in-pseudo-distributed-mode-and-connect-java-client/ if you wish to try it). As you will see from that guide, getting HBase up and running in this mode actually involves setting up two different system systems manually: first hadoop HDFS, then HBase itself.

Now to the point: the HBase configuration files are monsters, and your setup is vulnerable to the quirks in default network configurations (in which I include both the default networking setups on Ubuntu boxes, and the subtleties of Elastic IPs and internally assigned domain names on EC2). When things go wrong, you will be presented with reams of output in the log file. All the information you need to fix things is in there, and if you are a skilled admin you are going to get through it.

But what happens if it does wrong in production and you need to fix it in a hurry? And what happens if like us, you have a small team of developers with big ambitions and can’t afford a team of crack admins to be on standby 247?

Look seriously, if you’re an advanced db admin wanting to learn a NOSQL system, choose HBase. It’s so damn complex that safe pairs of hands are going to get paid well.

But if you’re a small team just trying to get to the end of the tunnel like us, wait ’til you hear the Gossip…

It’s Gossip talk dude, Gossip!

Cassandra is a completely symmetric system. That is to say, there are no master nodes or region servers like in HBase – every node plays a completely equal role in the system. Rather than any particular node or entity taking on a coordination role, the nodes in your cluster coordinate their activities using a pure P2P communication protocol called “Gossip”.

A description of Gossip and the model using it is beyond this post, but the application of P2P communication within Cassandra has been mathematically modelled to show that, for example, the time taken for the detection of node failure to be propagated across the system, or for a client request to be routed to the node(s) holding the data, occur deterministically within well bounded timeframes that are surprisingly small. Personally I believe that Cassandra represents one of the most exciting uses of P2P technology to date, but of course this idea is not relevant to choosing your NOSQL database!

What is relevant are the real benefits that the Gossip-based architecture gives to Cassandra’s users. Firstly, continuing with the theme of systems administration, life becomes much simpler. For example, adding a new node to the system becomes as simple as bootstrapping its Cassandra process and pointing it at a seed node (an existing node within your cluster). When you think of the underlying complexity of a distributed database running across, potentially, hundreds of nodes, the ability to add new nodes to scale up with such ease is incredible. Furthermore, when things go wrong you no longer have to consider what kind of nodes you are dealing with – everything is the same, which can make debugging a more progressive and repeatable process.

Secondly I have come to the conclusion that Cassandra’s P2P architecture provides it with performance and availability advantages. Load can be very evenly balanced across system nodes thus maximizing the potential for parallelism, the ability to continue seamlessly in the face of network partitions or node failures is greatly increased, and the symmetry between nodes prevents the temporary instabilities in performance that have been reported with HBase when nodes are added and removed (Cassandra boots quickly, and its performance scales smoothly as new nodes are added).

If you are looking for more evidence, you will be interested to read a report from a team with a vested interest in hadoop (i.e. which should favor HBase)…

A report is worth a thousand words. I mean graph right?

The first comprehensive benchmarking of NOSQL systems performed by Yahoo! Research now seems to bear out the general performance advantage that Cassandra enjoys, and on the face of it the figures do currently look very good for Cassandra.

At the time of writing these papers are in draft form and you can check them out here:
http://www.brianfrankcooper.net/pubs/ycsb-v4.pdf
http://www.brianfrankcooper.net/pubs/ycsb.pdf

NOTE: in this report HBase performs better than Cassandra only respect of range scans over records. Although the Cassandra team believes they will quickly approach the HBase times, it is also worth pointing out that in a common configuration of Cassandra range scans aren’t even possible. I recommend this to you as being of no matter, because actually in practice you should implement your indexes on top of Cassandra, rather than seek to use range scans. If you are interested in issues relating to range scans and storing indexes in Cassandra, see my post here https://ria101.wordpress.com/2010/02/22/cassandra-randompartitioner-vs-orderpreservingpartitioner/).

FINAL POINT OF INTEREST: the Yahoo! Research team behind this paper are trying to get their benchmarking application past their legal department and make it available to the community. If they succeed, and I hope they do, we will be treated to an ongoing speed competition galore, and both HBase and Cassandra will doubtless be improving their times further.

A word on locking, and useful modularity

You may no doubt hear from the HBase camp that their more complex architecture is able to give you things that Cassandra’s P2P architecture can’t. An example that may be raised is the fact that HBase provides the developer with row locking facilities whereas Cassandra cannot (in HBase row locking can be controlled by a region server since data replication occurs within the hadoop layer below, whereas in Cassandra’s P2P architecture all nodes are equal, and therefore none can act as a gateway that takes responsibility for locking replicated data).

However, I would reflect this back as an argument about modularity, which actually favours Cassandra. Cassandra implements the BigTable data model but uses a design where data storage is distributed over symmetric nodes. It does that, and that’s all, but in the most flexible and performant manner possible. But if you need locking, transactions or any other functionality then that can be added to your system in a modular manner – for example we have found scalable locking quite simple to add to our application using Zookeeper and its associated recipes (and other systems such as Hazelcast might also exist for these purposes, although we have not explored them).

By minimizing its function to a narrower purpose, it seems to me that Cassandra manages to implement a design that executes that purpose better – as indicated for example by its selectable CAP tradeoffs. This modularity means you can build a system as you need it – want locking, grab yourself Zookeeper, want to store a full text index, grab yourself Lucandra, and so on. For developers like us, this means we don’t have to take on board more complexity than we actually need, and ultimately provides us with a more flexible route to building the application we want.

MapReduce, don’t mention MapReduce!

One thing Cassandra can’t do well yet is MapReduce! For those not versed in this technology, it is a system for the parallel processing of vast amounts of data, such as the extraction of statistics from millions of pages that have been downloaded from the Web. MapReduce and related systems such as Pig and Hive work well with HBase because it uses hadoop HDFS to store its data,  which is the platform these systems were primarily designed to work with. If you need to do that kind of data crunching and analysis, HBase may currently be your best option.

Remember, it’s horses for courses!

Therefore as I finish off my impassioned extolation of Cassandra’s relative virtues, I should point out HBase and Cassandra should not necessarily be viewed as out and out competitors. While it is true that they may often be used for the same purpose, in much the same way as MySQL and Postgres, what I believe will likely emerge is that they will become preferred solutions for different applications. For example, as I understand StumbleUpon has been using HBase with the associated hadoop MapReduce technologies to crunch the vast amounts of data added to its service. Twitter is now using Cassandra for real time interactive community posts. Our needs fit better with the interactive serving and processing of data and so we are using Cassandra, and probably to some degree there you have it.

As a controversial parting shot though the gloves are off for the next point!

NOTE: before I continue I should point out Cassandra has hadoop support in 0.6, so its MapReduce integration may be about to get a whole load better.

O boy, I can’t afford to lose that data…

Perhaps as a result of the early CAP Theorem debates, an impression has grown that data is somehow safer in HBase than Cassandra. This is a final myth that I wish to debunk: in Cassandra, when you write new data it is actually immediately written to the commit log on one of the nodes in the quorum that will hold the replicas, as well as being replicated across the memory of the nodes. This means that if you have a complete power failure across your cluster, you will likely lose little data. Furthermore once in the system, data entropy is prevented using Merkle trees, which further add to the security of your data 🙂

In truth I am not clear exactly what the situation with HBase is – and I will endeavour to update this post as soon as possible with details – but my current understanding is that because hadoop does not yet support append, HBase cannot efficiently regularly flush its modified blocks of data to HDFS (whereupon the new mutations to data will be replicated and persisted). This means that there is a much larger window where your latest changes are vulnerable (if I am wrong, as I may be, please tell me and I will update the post).

So while the Cassandra of Greek mythology had a rather terrible time, the data inside your Cassandra shouldn’t.

NOTE: Wade Arnold points out below that (at the time of writing this) hadoop .21 is about to be released, which will solve this problem with HBase.

Written by dominicwilliams

February 24, 2010 at 7:27 pm

Cassandra: RandomPartitioner vs OrderPreservingPartitioner

with 20 comments

When building a Cassandra cluster, the “key” question (sorry, that’s weak) is whether to use the RandomPartitioner (RP), or the OrderPreservingPartitioner (OPP). These control how your data is distributed over your nodes. Once you have chosen your partitioner, you cannot change without wiping your data, so think carefully!

For Cassandra newbies, like me and my team of HBasers wanting to try a quick port of our project (more on why in another post) nailing the exact issues is quite daunting. So here is a quick summary.

What OPP gives you

Using OPP provides you with two obvious advantages over RP:
1. You can perform range slices. That is you can scan over ranges of your rows as though you were moving a cursor through a traditional index. For example, if you are using user ids as your keys, you could scan over the rows for users whose names begin with J e.g. jake, james, jamie etc
2. You can store real time full text indexes inside Cassandra, which are built using the aforementioned feature e.g. see Lucandra
3. If you screw up, you can scan over your data to recover/delete orphaned keys

***UPDATE*** Since v6 you *can* now scan your keys when using RP, although obviously not in any particular order. Typically you request a page of rows starting with the empty/”” key, and then use the apparently random end key from the page as the start key when you request another page. At the time of writing, this method only seems to work with KeyRange not TokenRing. If you are using Java to access Cassandra read the change log for v0.804 of Pelops.

Given that Web applications typically need/benefit from the above, the question is why would you *not* use OPP. The answer is a nuanced one about load balancing.

The problem with OPP

With both RP and OPP, by default Cassandra will tend to evenly distribute individual keys and their corresponding rows over the nodes in the cluster. The default algorithm is nice and simple: every time you add a new node, it will assign a range of keys to that node such that it takes responsibility for half the keys stored on the node that currently stores the most keys (more on options for overriding the default behaviour later).

The nuance is, that this simple default algorithm will tend to lead to good load balancing when RP is used, but not necessarily when OPP is used.

The reason is that although the algorithm succeeds in assigning key ranges such that as your cluster scales nodes receive roughly similar numbers of keys, with OPP on any given node those keys are unlikely to be drawn equally from the different column families present within your database…

If the distribution of keys used by individual column families is different, their sets of keys will not fall evenly across the ranges assigned to nodes. Thus nodes will end up storing preponderances of keys (and the associated data) corresponding to one column family or another. If as is likely column families store differing quantities of data with their keys, or store data accessed according to differing usage patterns, then some nodes will end up with disproportionately more data than others, or serving more “hot” data than others. <yikes!>

By contrast, when using RP the distribution of the keys occuring within individual column families does not matter. This is because an MD5 hash of keys is used as the “real” key by the system for the purposes of locating the key and data on nodes (the MD5 hashes randomly map any input key to a point in the 0..2**127 range). The result is that the keys from each individual column family are spread evenly across the ranges/nodes, meaning that data and access corresponding to those column families is evenly distributed across the cluster.

If you must have OPP

You may quite reasonably feel that you must have the range scan features that come with OPP, for example because you want to use Lucandra. The question then becomes how you can you ameliorate the aforementioned problems with load balancing.

The best you can do, is to identify the data upon which you do not need to perform range scans. This data can then be randomly distributed across your cluster using a simple idiom where the key is actually written as <MD5(ROWKEY)>.<ROWKEY>

But be clear, the items whose keys must be undecorated (because you wish to perform range scans over them), may still not map evenly onto the key ranges held by the nodes. The only recourse you have then, is to consider manually specifying the key ranges assigned to nodes. This is typically done when you bootstrap a new node, but you can also rebalance an existing cluster by simply decomissioning nodes, deleting their data, and then bootstrapping them back in. To do this safely, you obviously have to do this one at a time, but then I’m sure I didn’t have to tell you that…

You can see where this is going now right? You’ve just made a whole load of work for yourself, and anyway, even if you have the time, if you have lots of different column families with widely differing key distributions then getting load balancing right is going to be a nightmare.

This is the basic reason that fully seasoned Cassandra heads, in my experience, seem to prefer RD *unless* a mono use setup is proposed, for example where a cluster is used simply to store a full-text index with Lucandra.

If you have a database with a seriously heterogeneous set of column families, and need range scans, you might now be thinking you should actually be using HBase, which is designed for this. That would not be a bad choice (!), but there are good reasons for hanging with Cassandra if you can, which I will cover in a future post. Read on…

If you must use RP (very likely)

So having delved a little more deeply into the implications of OPP, you decide you really should go with RP. But, what to do with those indexes you need?

Well, first of all there is a really simple if brutal solution: simply store your index inside a single column family row as a series of columns. Since Cassandra can in principle cope with millions of columns, this is perfectly possible. Although it is true each index won’t be distributed across your whole cluster, the load will at the least be distributed across the nodes holding the replicas. If you use a typical replication factor (RF) of 3 the load associated with each index will be shared by 3 nodes etc.

In the vast majority of cases, this will be enough, and it will be sufficient that the rest of your data is properly balanced across your cluster.

But, I hear you saying, this is too brutal. Your index is too massive to fit on 3 nodes, is extremely hot and this just won’t work. You moved to Cassandra because you want your load distributed across your entire cluster. Period.

This is a perfectly reasonably point of view.

The only solution in this case is to build an index system over the top of the simple hashmap provided. We are taking this approach, and it will be elaborated with some sample code in a later post.

Basic indexing strategy for RP

For those that need to know the basic strategy now, here it is: you need to start off with the simple approach where you store your entire index using columns under a single key. As the number of columns grows past some threshold you define, the columns should be split such that half the entries are migrated to a new key/row. Thus the index is split across the cluster evenly.

Each range can be stored under a key named in a predictable way, for example <INDEX>.<SPLIT NO.> The start and end index entries stored in each split should themselves be stored in a dedicated column family that is used to record index meta information using the same key name, ensuring that the meta information is also distributed.

You can then progressively test the existence of splits simply by attempting to open the key for the meta that would be used to describe the split. If you can retrieve the meta information, you know that the split also exists. It won’t be necessary to cache this information to make the process reasonably performant – Cassandra already caches data in memory, and also uses Bloom filters to determine whether or not a requested row exists (Bloom filters enable a Cassandra node to rapidly determine whether it holds a key without traversing its list of keys).

There you have it, an index offering range scans fully distributed over your cluster!

Full text search sanity check

Implementing a full text index will of course involve more work than a simple left-side/ISAM style index, although the principles are the same. Given the existence of Lucandra though, I would suggest that before proceeding to create your full text index using the described approach, you first examine another possibility: running your full text searches off a dedicated cluster.

If you are running in the cloud, for example on EC2 or Rackspace Cloud, you can start your dedicated full text search cluster at low cost on small instances that can be scaled up if necessary later. Otherwise, consider virtualization or configuring Cassandra to run two clusters in parallel on the same nodes (more on this possibility in a later post).

The beauty of open source is that many problems have already been solved for you, and Lucandra is too good an opportunity to miss is you need full text search on Cassandra.

Written by dominicwilliams

February 22, 2010 at 10:56 pm

HBase vs Cassandra & comparing NOSQL solutions

with one comment

If like me, you have moved your enterprise to the NOSQL database camp, you will trying to figure out which NOSQL database will be the best long term option. This is no easy task, because we are still at the beginning of the cycle.

Our choice came down to HBase vs Cassandra. We chose HBase for its vibrant community, faithful Google BigTable-like design and because it can use hadoop for storage (which is already quite mature). Right now we’re quite far down the road with HBase but trying to keep our minds open.

Checkout the first serious comparison of NOSQL solutions I found. Cassandra does nicely. hmm:
http://www.brianfrankcooper.net/pubs/ycsb-v4.pdf
http://www.brianfrankcooper.net/pubs/ycsb.pdf

Written by dominicwilliams

February 9, 2010 at 10:04 pm

Posted in Cassandra, HBase

Tagged with , , ,