Dominic Williams

Occasionally useful posts about RIAs, Web scale computing & miscellanea

Fight My Monster Architecture in 5 mins!

with 2 comments

We computing engineers live in exciting times. The Internet and its supporting infrastructures really have come of age, and all of a sudden things can, and mostly importantly should be done in new ways. Some small teams are successfully scaling enterprises to massive scale on tiny budgets, and the new generation of applications and games can be delivered across multiple platform formats. With these opportunities come responsibilities and we need to choose our technical strategies and platforms wisely, as these decisions have deep impact on developing tech businesses.

Fight My Monster has grown rapidly and will soon pass 1 million user accounts. But although we recently completed a funding round with some fantastic investors, the business was originally bootstrapped on a very limited budget. For those following the project, and for those who are considering working for Fight My Monster  — as you may have heard, Fight My Monster is expanding and has some great opportunities for talented engineers right now — I wanted to provide a quick overview of our architecture, and the reasons we chose it. As you will see, it’s not that conventional.

We’ve gone for a simple but scalable three level architecture, which we host in the cloud. I hope this overview of how our system works proves interesting.

RIA Client

We have a Flash client with a difference. It is full screen, uses a flowed layout, and looks and behaves like a normal website, except in those areas where highly interactive game sparkle is required. The client implements an underused technique called deep linking which allows individual virtual pages within the “site” to be addressed by URL, and browser integration via Javascript allows the simulated site to respond to clicks on the browser’s forward and back buttons and change page. The client has a complex state, but if the browser window is refreshed, the client is able to restore its previous state from the server. We developed the client mainly in Flex using a framework we developed in house. It’s one of the more sophisticated Flex applications out there at the moment.

Application Layer (Starburst)

Our client communicates with the application layer using a proprietary remote method calling framework built on top of the RTMP (Real Time Media Protocol) protocol supported by the Flash player. RTMP allows for multiplexing of a remote method call stream, video and other data over single TCP/IP connection, in which remote calls pack their parameters using AMF (Action Message Format). The protocol also allows server code to call into functions that the client has registered. The framework we created allows call results to be returned asynchronously in a different order to which the calls were made, such that server methods taking a little longer to generate their result do not block the return of results for following calls, thereby keeping client applications responsive. The framework also handles things like automatic reconnection, the queuing of events while a connection is broken, and the resetting of client state where data has been lost after cluster events or prolonged disconnection.

The application layer itself is comprised from a horizontally scalable cluster of servers running a proprietary Java software framework called Starburst, which we may eventually open source. Starburst uses the Red5 server as a container, which provides the base RTMP functionality and contains Tomcat too, which enables us to serve JSP pages (we only serve a few HTML pages, but it is a requirement nonetheless). The current setup may change, and we are looking at using Netty and even replacing RTMP with a bespoke protocol but that is another discussion.

I am limited in what I can say about how Starburst works right now, but basically a client connects by making an initial HTTP “directory” request to obtain the IP address of the cluster node it should connect to. The HTTP request is made to a load balancer, and can be forwarded to any running Starburst node in the cluster, which is completely symmetric in the sense that all nodes are the same and none play special roles. Once the targeted cluster node’s IP address is returned, the client makes a direct RTMP connection to that node. The node the client is directed to is chosen using consistent hashing (a really useful approach that you should Google if you have not come across before). The consistent hashing algorithm evenly distributes clients over the cluster nodes and, if a cluster node is lost, automatically evenly redistributes clients that were connected to it among the remaining nodes, thereby preventing load spikes and hot spots.

The cool bit about Starburst is that it completely abstracts away the multiplicity of client and server nodes for application code writers. Clients and server side connection contexts may register to receive events, and server code written to Starburst APIs may generate events on paths or for specific users that may or may not be online, but it is only ever necessary to write business/game logic, never to consider how different parts of the system are connected. I can’t go into the means by which this is achieved at scale now, but needless to say it makes a real difference to what we do.

Database (Cassandra)

There used to be only one way to make complex apps that scale: shard your database. Sharding is the process of dividing up responsibility for different ranges of your data amongst your servers. A simple sharding strategy might be, for example, to have 27 servers and then use the first letter of usernames to map responsibility for users to the servers. In practice, many organizations also shard functionality, so different servers have different types of function too. For example, Facebook works this way with MySQL databases. The problem with this approach is that your systems become very, very complex, and system complexity has a tendency to scale up with app complexity.

When we first started with Fight My Monster, we also began by thinking about a sharded architecture using MySQL for storage. However, Fight My Monster is a pretty complex app, and it quickly became apparent that sharding was going to require too much work given the resources we had at that time. We then experimented with different ideas, and it was about this time that the NoSQL movement first came to our attention. For all the described reasons, swapping the relational database model for a less complex one that could scale without sharding immediately stood out as a bargain. We began using HBase, which is a very solid system that offers a single logical database that is horizontally scalable, but which is actually quite complex to maintain and administer. Because of that complexity, and for other reasons including throughput, we moved over to Cassandra, which has proven to be a good decision for us.

Without wishing to stir up NoSQL flame wars, Cassandra has amazing qualities. Some of the highlights are as follows:-

  • Clusters provide a single logical database that is almost infinitely scalable
  • The cluster is symmetric i.e there are no special types of node to manage
  • Nodes organize themselves using a P2P system, and adding new nodes is simple
  • Performance is best in category, with write performance better than read performance
  • Data is safely replicated across nodes (we use a Replication Factor of 3)
  • A number of nodes can go down without affecting cluster operation

We access Cassandra using an open source Java library we originally created called Pelops. Because our data processing is highly transactional, and Cassandra does not provide locking mechanisms itself, we need to handle the distributed serialization of database operations initiated from the various Starburst nodes. For this we use another open source library that we also created called Cages in conjunction with Hadoop’s ZooKeeper system.

Using Cages and ZooKeeper has worked very well for us, but if there is a single theme running through Fight My Monster’s architecture it’s simplification! For this reason, I recently developed the Wait Chain Algorithm, which provides a way for Cassandra itself to be used as a distributed lock server, for example using the Pelops or Hector clients, thus providing a way for us to eventually do without ZooKeeper. Watch this blog for news of the first system using this algorithm.

Written by dominicwilliams

February 28, 2012 at 8:32 pm

ConcurrentHashMap – avoid a common misuse!

with 15 comments

If you program systems with Java, you have probably long been using ConcurrentHashMap. This post explores a caveat.

ConcurrentHashMap is often introduced to simplify code and application logic. For example:

HashMap<String, MyClass> m = new HashMap<String, MyClass>();
synchronized (m) {
    for each (Entry<String, MyClass> e in m.entrySet())

might be replaced with the following so long as a consistent snapshot is not required:

ConcurrentHashMap<String, MyClass> m = new ConcurrentHashMap<String, MyClass>();
for each (Entry<String, MyClass> e in m.entrySet())

More often though, ConcurrentHashMap is introduced to improve performance: internally ConcurrentHashMap allows concurrent threads to read values without locking at all, except in a minority of cases, and for concurrent writers to add and update values while only acquiring locks over localised segments of the internal data structure. Read operations are therefore mostly synchronization free and have greatly improved performance, and if there are many concurrent writers then lock contention will be reduced improving performance even further.

The rub though is that the benefits on offer mean that ConcurrentHashMap is now often used Willy Nilly! As it happens, that is often not such a bad idea, but there is unfortunately a big caveat that 99% have missed: The designers of ConcurrentHashMap originally conceived the class for occasional use in in high concurrency areas at the center of systems and the default construction options reflect this!!

Specifically, the fully parametized constructor of ConcurrentHashMap has 3 parameters, initialCapacity, loadFactor and concurrencyLevel and this last one can cause problems.

Recall that ConcurrentHashMap shards its data into segments to reduce writer lock contention. Well, in actual fact the concurrencyLevel parameter directly specifies the number of shards that are created by the class internally. The idea is that the number of shards should equal the number of concurrent writer threads that you normally see. And, the default value for concurrencyLevel is 16!

So, if like most people you simply use the parameterless constructor, and accept the default configuration, your map will be instantiating the objects needed for 16 shards before you even add your first value…

Before proceeding to see how pernicious this can be, consider how unnecessary this is for most cases. The likelihood that 16 threads will actually be simultaneously spinning on your map trying to write to its data structures implies a huge level of concurrency given the fact that readers don’t even block writers. In most cases, even if several threads are accessing and writing the structure, a single shard will be completely sufficient to get most of the benefits.

But you say, it’s only a few extra objects, why should I care…

Well, once you start using ConcurrentHashMap as a matter of course, the number of instances allocated can grow dramatically. Consider for instance that you are building a game server that maintains contextual data for each connected user. You might associate a ConcurrentHashMap with each connected user allowing multiple threads to query and update online user’s contextual information with a high degree of performance and safety

In principle this works fine, but in practice you may find your game server has several thousand users connected, which drives the allocation of several thousand ConcurrentHashMap instances. Now, when you do a heap dump of your server and have a poke around, you notice there are millions and zillions of ConcurrentHashMap$Segment, ConcurrentHashMap$HashEntry[] and ReentrantLock$NonfairSync objects. This is because, in fact, the creation of a single ConcurrentHashMap instance for each of say 5000 users minimally results in 240,000 of these objects being allocated even before any values are added to the maps.

Depending upon how widespread your usage of ConcurrentHashMap is, how many maps are created per user and the manner in which third party authors have been using ConcurrentHashMap, usage of ConcurrentHashMap can end up adding serious memory and GC load to your system.


Because you often never really know how your classes might be re-used and therefore how many instances might be created in production, by default developers need to get in the habit of creating their ConcurrentHashMap instances with parameters something like this:

ConcurrentHashMap<String, MyClass> m =
new ConcurrentHashMap<String, MyClass>(8, 0.9f, 1);

In the above, only a single shard segment is created internally that allocates an initial capacity for its HashEntry[] table of 8, which allows for some reasonable number of values to be added before reallocation, and the load factor of 0.9 ensures reasonably dense packing. The single shard offers full read benefits and, unless you have very high concurrency sufficient write throughput without risking crazy unnecessary memory loading.

For completeness it is worth pointing out that there are indeed places where you might want to increase segment numbers, but such applications are fairly unusual and specialised. For example,  we have an EventRoutingManager class in our Starburst platform (an application layer platform that provides a horizontally scalable event publisher/consumer model) and it’s constructed something like as follows:

ConcurrentHashMap<String, EventPathContext> m =
new ConcurrentHashMap<String, MyClass>(524288, 0.75f, 32);

But to be clear, unfortunately for most usage the default ConcurrentHashMap parameters should be the exception, not the rule!

Written by dominicwilliams

December 12, 2011 at 2:18 pm

Posted in Uncategorized

Fight My Monster in TechCrunch

with one comment

Fight My Monster was in TechCrunch today, which deserves a big hooray and thank you!!! We’re working hard and hope we can fulfill expectations.

Thanks too to Moshi for setting an inspiring example. Please note we want to be as big as Moshi Monsters, not imitate them! The game design and vision are very different.

If you haven’t checked Fight My Monster out yet, I recommend you have a try – there really is nothing quite like it on the Web. We are incubating in the UK, and the best time to experience the site is 4pm-8pm BST weekdays (after school) or BST daytime weekends.

Fight My Monster wants to be a Moshi Monsters for boys, Dylan Collins joins as Angel

Written by dominicwilliams

June 1, 2011 at 8:18 pm

Posted in Uncategorized

PayPal recurring payments and bonkers docs

leave a comment »

If you’ve ever tried to integrate with PayPal recurring payments, you’ll know that the documentation only covers most of the API.

I started a thread to try and clarify the key issues, here

Written by dominicwilliams

April 17, 2011 at 3:06 pm

Posted in E-commerce

Tagged with ,

10 steps to upgrade a Cassandra node

with 2 comments

The following steps might prove interesting to those with a new Cassandra cluster who are wondering how to upgrade, or to those investigating Cassandra who are interested in what an online upgrade looks like. The good news for those in production, is that if your cluster has a replication factor (RF) of at least 3, read and write operations (with the exception of rarely used ConsistencyLevel.All operations) will be able to continue uninterrupted throughout the upgrade process i.e. you don’t need to put up a “down for maintenance” sign!!

This process should be performed on every node in the cluster in sequence to effect a rolling upgrade. Once you are comfortable with the process, and the number of nodes in your cluster starts to climb, you will want to script much of it.

1. Open a ssh window to the Cassandra node.
$ ssh

2. Visit The homepage has a link to the latest tar.gz package download. Copy the url of that link

3. Change folder to /opt and then download the upgrade package.
$ cd /opt
$ wget…cassandra/0.7.4/apache-cassandra-0.7.4-bin.tar.gz

4. Unpack the package and enter its configuration folder
$ tar xvf apache-cassandra-0.7.4-bin.tar.gz
$ cd /opt/apache-cassandra-0.7.4/conf

5. Backup the default configuration (sometimes the format changes). Copy in the existing configuration.
$ mv cassandra.yaml cassandra.yaml.bak
$ cp /opt/cassandra/conf/cassandra.yaml .

6. Edit the environment configuration script. This often changes so don’t copy in the existing script. You need to configure heap size to be ~50% RAM on server and new heap size to be 100MB * no. CPUs
$ vim cassandra.yaml
…then, for example for server with 6GB RAM and 6CPUs

7. Create a link to jna.jar from Cassandra’s lib folder (JNA or “Java Native Access” enables Cassandra to directly access native libraries. Without this capability, certain operations such as snapshotting using nodetool are more likely to fail with OOM).
$ cd /opt/apache-cassandra-0.7.4/lib
$ ln -s /usr/share/java/jna.jar .

8. Shutdown the Cassandra software running on the current node. This will not interrupt the cluster in any way.
$ pkill -f ‘java.*cassandra’

9. Change to the “opt” folder and change the soft link the Cassandra distribution
$ cd /opt
$ rm cassandra
$ ln -s /opt/apache-cassandra-0.7.4 cassandra

10. Restart Cassandra
$ cd /opt/cassandra/bin
$ ./cassandra

After restarting Cassandra, you should watch the startup messages to ensure the node comes up without difficulty.

Written by dominicwilliams

March 26, 2011 at 9:23 pm

New UK “prospective entrepreneur” Tier 1 visa. Much ado?

leave a comment »

The UK is trying hard to be more enterprise-friendly and is loosening bureaucracy and introducing legislation designed to spur growth. Some legislation coming into effect 6 April 2011 is designed specifically with the tech startup sector in mind, and has some similar aims to the Startup Visa Act 2011 proposed by John Kerry in the US (see

John Kerry’s Startup Visa Act 2011 is someway off becoming law and getting something like this out of the door before the Yanks has created much excitement. Like many London-based entrepreneurs I’ve been investigating the benefits.

The general impression a lot of articles give is this will make it easier to bring in overseas talent (with the often cited benefit being able to bring in foreign talent for key engineering roles such as CTO). I too was really excited but digging deeper though I don’t think the first impressions were right.

As I understand it, the key points about the legislation are:

  • A new “Prospective Entrepreneur” visa will become available, which in principle enables a foreign “entrepreneur” to enter the UK for six months to finalize a funding deal for their venture, so long as their visa application is supported by accredited prospective investors who agree to reach a final funding decision within that timeframe. At the end of that six months, the entrepreneur must meet the criteria to transfer to a standard Tier 1 Entrepreneur visa.
  • Whereas before an entrepreneur was required to invest at least £200,000 of their own money in a UK venture to receive a Tier 1 Entrepreneur visa, this is now possible if an FSA registered venture capitalist, an endorsed entrepreneurial seed funding competition, or a UK Government Department to invest £50,000 in the UK venture on their behalf.
  • A single investment can enable an entrepreneurial team of 2, such as a CEO and CTO, to apply for a Tier 1 visa

For reference, the details are here:

The first thing that should be apparent, is that this primarily appears to be aimed at foreign entrepreneurs wishing to come to the UK, rather than helping frustrated UK tech entrepreneurs who want to bring in talent from abroad. Secondly, since entrepreneurs from inside the EU can already come to the UK to setup businesses, and american entrepreneurs are unlikely to want to leave the West Coast, this is aimed at quite a narrow community e.g. Indian and Russian (inc. old Soviet sates) entrepreneurs.

Where I believe many may have become excited, is in wording that suggests that the new rules can be bent to the needs of existing tech startups. Personally, I believe this looks tenuous – and I will explain why – but here is how it might work if you wanted to quickly bring a CTO to London:

  • Get an accredited investor to promise £50,000 funding contingent upon the CTO joining your company (thus the foreign talent you bring in will trigger the investment)
  • Provide the CTO with autonomous “access” to those funds as part of their R&D budget
  • Make sure they get the obligatory english language qualifications, and pay them a decent salary so they pass the maintenance test

Hopefully this should get them into the UK, but one should take careful note of the statement in the guide: “The Tier 1 (Entrepreneur) category is for those investing in the United Kingdom by setting up or taking over, and being actively involved in the running of one or more businesses in the United Kingdom”. I can easily imagine that the relevant agencies will contest that a foreign CTO, for example, joining a startup meets this criteria.

The ROI has to be compared to the existing conventional route of bringing someone in on a Tier 2 “skilled worker” visa. This is the route my company has just committed to. The requirements are as follows:-

  • You must pass the “resident labour market test” to prove you were not able to fill your vacancy here, which includes advertising the job for at least a month (unless it is categorized under the Shortage Occupations List, which is currently changing, but includes for example software engineers specializing in animation in the video games industry)
  • Your company must acquire a Tier 2 Sponsor License, which it can apply for if it adheres to various requirements and practices
  • Once you have a license, you must apply to the Border Agency for “certificates of sponsorship” for each of your prospective hires
  • Assuming you get your certificates, these must then be given to your prospective hires
  • Finally, your prospective hires may use the certificates to apply for Tier 2 visas (for which, of course, there are minimum requirements such as english language qualifications and salary)
  • Your prospective hires may or may not get their visas, depending upon the number of points they score and the governmental monthly limit then in force, which will be 4200 visas in April 2011, and only 1500 per month thereafter (points are awarded for things like salary and educational background).

The process takes a minimum of 2-3 months, and since you must compete under the points system for one of the 1500 visas going there is no guaranteed timeframe in which you will be able to bring your talent over despite the effort (in large part because in London plenty of talented foreigners are imported for financial jobs, and because those with high salaries are prioritized). However, at least you can bring in a team and the application itself should not be open to contest.

In summary, in practice both processes are still a nightmares if you want to bring in talent from outside the EU. Neither can be completed quickly: In relation to the Prospective Entrepreneur visa, the specified classes of UK investors generally don’t move quickly, and that will likely hold even if they only need to agree to “sponsorship” and can back off investment later. For that reason in practice you are probably better off using Tier 2, since there is no ambiguity about what is possible, you will not become beholden to an outside investor (assuming you have one), and the timeframes are likely similar.

So what’s to be excited about? If you want to bring in star talent to your tech startup you are looking at a 2-3 month process. Time you wanted them to be doing their thing, and when they could be tempted by other jobs!

As for the UK getting one over on the US with its yet-to-be-passed Startup Visa Act, I’m really not so sure. The US act will only require the involvement of a “qualified” investor. This means that you will be able to deal with business angels who can move more quickly and anyway US venture capitalists are less risk adverse, move more quickly and the West Coast on balance is a better environment too.

If we really want to get one over on the US, we should make it really easy for UK tech entrepreneurs to import talent. There is a massive shortage of truly experienced and talented engineers on the market in the UK as everywhere, and addressing this area would give a real boost.

As a simple suggestion, why not allow high tech startups to bring in foreign stars and then apply for their Tier 2 visas while they are here (subject of course to anti abuse criteria). Silicon Valley has been built on immigration, so let’s at least make it easier to bring talented people in. That’s a guaranteed way to boost the tech sector. Simple practical solutions like that wouldn’t require massive legislative adjustment but would be fantastic.

Written by dominicwilliams

March 20, 2011 at 7:21 pm, again

with 2 comments

If you followed my blog you might recall how we use the URL when we advertise Fight My Monster. The idea is that it is an easy to remember redirector for

Here’s an update, and some additional shameless plugging of the link. Many kids remember the url, but as with everything these days, most type “monster fun” into Google’s search box rather than into the browser address bar. This went swimmingly, as searching on “monster fun” returned a link to Fight My Monster. But – you guessed it – some days ago this stopped working for reasons known only to Google (ouch, very big ouch!).

Could this be due to a complaint by other sites also into virtual monsters? The reality is that the vast majority of people typing in “monster fun” want to get to our site because of our ads…

We’ve modified the index page HTML to include “Monster Fun” in the page title, in the description and in the meta keywords. Let’s see if that brings it back!

If anyone has any great White Hat ideas I’d appreciate hearing them. And Google, if you read this article, please reinstate the link and be nice to kids that want to find us 😉

Update: Good news. We are now back up to 2/3 in Google’s search results for “monster fun”.

Written by dominicwilliams

February 22, 2011 at 4:23 pm

Posted in Uncategorized

Cassandra: the importance of system clocks, avoiding OOM and how to escape (unnecessary) OOM meltdown!

with 2 comments

First of all, big thanks to jbellis and driftx (Jonathan Ellis and Brandon Williams of DataStax). Without them, I’d still be up trying to fix a problem node I’ll talk about later. They were no doubt horrified at an issue created through wanton misconfiguration, but they helped out amazingly anyway. For the benefit of other people as reckless as us, and who are equally unskilled in the black arts, here are some cautionary tales and a hard core tip for recovering from a serious OOM situation.

Before going any further, I’ll just give you some background to Fight My Monster, which went live late last year. We are a self-funded London startup. We have had enough funds to develop a great product, promote it using TV commercials, but everything else has to be done on a shoestring. Cassandra is the primary workhorse for processing data, and we ask a LOT of it.

The game itself is an MMO, social networking and minigames hybrid. Since launching it has grown quite fast, with most traffic being concentrated 3pm -> 10pm GMT weekdays and weekends (basically when kids in the UK are free – since we haven’t started with international markets yet). The site is characterized by a huge number of transactions, many of which require consistency. Furthermore, we write and rewrite huge amounts of data. We log instant chat messages for the security of kids, we copy kid’s room (aka “Wall”) posts to all their friend’s feeds, and we let kids grow, fight and trade unlimited numbers of monsters for nuggets, with every transaction being recorded, generating a storm of writes. And that’s just the start of it.

My only experience of highly loaded SQL systems is from running large user forums through another company. What I can tell you is that SQL systems just wouldn’t give Fight My Monster anywhere near comparable performance and reliability at the same cost. To some extent, Cassandra makes FMM possible. That said, there are a couple of pitfalls you really need to avoid…

Beware Virtualization: Your Clocks Need To Work!

Before discussing column family configuration and OOM, I will outline an equally serious problem that can be caused by virtualization. I decided to give up managing server racks a long time ago. Fight My Monster combines a CDN with a cloud service where you can actually provision *dedicated* machines on demand. I recommend anyone to do the same if you can, because it simplifies resource management.

However, fantastic as the cloud service we use is, in the early days aspects of our cloud computing infrastructure caused problems with our data. The problem was this: although we are able to provision dedicated machines, our cluster nodes  still run Ubuntu on top of a VMware hypervisor that enables features like snapshotting. Unfortunately, what we did not realize is that system clock drift is a very common issue with virtualized systems (see to get a feel), and this is very bad in Cassandra setups because timestamps affect the serialization of writes.

Unbeknownst to us, when our virtual servers were under load, their system clocks were drifting in a dramatic way. Ntpd would try to keep the clocks in sync with the Internet NTP servers though periodic tiny adjustments as normal, but as we found out, when drift is too rapid ntpd temporarily gives up. Total drift would thus continue growing until system load abated at which point ntpd, upon deciding it could now keep everything in sync again, would suddenly bring the clock into line with the network time servers, sometimes setting the clock back even 20 minutes or so into the past!!! Bizarrely, this was also happening to 1000s of LAMP servers using the service, but nobody had noticed.

In our case, running a distributed database, we certainly noticed the effects. At lot of our processing is transactional. For example, each time a monster fight happens, the monsters involved change and have their histories updated, and the monster owners have their monster lists and nugget bank balances adjusted. Often transactions stack up against each other with users engaging in multiple fights and trades in a short space of time. Where necessary, we write with consistency level QUORUM and use the Cages + ZooKeeper distributed locking system to prevent lost updates and ensure everything is serialized properly (see

But we were constantly suffering inexplicable data states as consequence of broken serialization. We were convinced the problem lay inside Cages or Cassandra (an upside of the situation was that Cages got many extra hours of code review). Eventually though, I noticed a fight record stored in Cassandra where a monster went into a fight with attributes that it was to gain from a fight in the future. This led to the realization the problem had something to do with clocks!

The key point: Cassandra cluster serializes write operations using the timestamp you send with them (the timestamp is often created for you if you use a library like Pelops, see In fact, timestamps enable Cassandra to use the clock as a serialization mechanism among its loosely connected nodes. If the clocks on your application servers jump about, so that the timestamps you send with write operations are wrong, you’re going to have problems.

Moral: if you are using virtual servers for your Cassandra setup, pay very close attention to how the system clocks are working and configure ntpd carefully.

Each case will be different, but for those interested this is what we did:

  1. Added “noapictimer irqpoll” to end of kernel line in /boot/menu.lst
  2. Added following line to /etc/rc.local
    echo “jiffies” > /sys/devices/system/clocksource/clocksource0/current_clocksource
  3. Configured ntp better e.g. ntp.conf

# don’t give up so easily, even though this should never happen
tinker panic 0
# log your drift and stats and review them periodically
driftfile /var/lib/ntp/ntp.drift
statsdir /var/log/ntpstats/
statistics loopstats peerstats clockstats
filegen loopstats file loopstats type day enable
filegen peerstats file peerstats type day enable
filegen clockstats file clockstats type day enable
# use lots of NTP servers to get closest synchronization to actual time

Our drift is now max 1-2ms under load and Cages/ZooKeeper/Cassandra is now serializing millions of transactions just fine.

Avoiding OOM: Pay Close Attention to Column Family Configuration

When you create new column families in a keyspace, Cassandra automatically configures them with sensible defaults for you. These defaults control things like the size of associated caches, which are actually very important to operation. Because the defaults are sensible for new installations, or for projects with relatively small numbesr of column families, it is easy to forget these parameters and concentrate purely on data design as you evolve. That’s what we did, so we had to learn the following the hard way!

Several Cassandra aficionados have expressed shock and horror at the number of column families we use in Fight My Monster: We now have more than 50. But this isn’t down to bad design. Really! Because our Fight My Monster is destination site, it provides wide ranging functionality and while some of the column families belong to legacy code and are being deleted or consolidated, the simple fact is that we need a lot of them. Unless you are a Web 2.0 site addressing a single very specific need, I would say the number of column families you use will increase inevitably. And anyway, you should still make sure you know about this…

So what’s the problem? The problem is that Cassanda is bootstrapped with a fixed maximum heap size. When it is running, there are therefore limits on how much data it can cache in memory at once, and if you misconfigure it sufficiently so that it reaches a situation where it needs more memory than is available, it will crash and dump core with an Out of Memory exception. Furthermore, if you have not made reasonable attempts at setting appropriate column family parameters, you can find yourself in a situation where a Cassandra node dies with OOM when you try to restart it, which is what happened to us.

First the basics: A Cassandra node sends new writes to a column family to its commit log, and then on to a in-memory data structure called a memtable where they are aggregated before being written to disk (memtables absorb overwrites and also act as read caches) . The memtable associated with a column family has three key parameters which affect memory consumption. There are some others too, which you can see by examining a column family configuration.

These three key parameters are 1. memtable_operations_in_millions 2. memtable_throughput_in_mb and 3.  memtable_flush_after_mins. The first parameter controls the maximum number of writes that the memtable will hold in memory before flushing them to disk. So for example if this number is 0.75, it will hold 750,000 writes in memory before flushing them. The second parameter is the maximum total size of the values of the column that must be written before the memtable is flushed from memory. However, it only pertains to the size of the column values, not their names, which can also be important. The third parameter is the maximum time in minutes a memtable should reside in memory before it is flushed.

Configuring these parameters requires care and thought, and you should use nodetool to examine statistics for your column families to work out the best values. Each column family must be treated on a case by case basis. For example, if a column family stores lots of small columns, then the total calculated size of the values will be small in memory. But simple Java objects representing operations/column writes in memory have an overhead, and if you set memtable_operations_in_millions too high the actual maximum memory consumed by the memtable will be able to greatly exceed the memtable_throughput_in_mb value. It must be possible for Cassandra to allocate the maximum memory that all of your memtables might consume from its heap, while still leaving memory available for other functions. A suggested calculation has been:

heap size = memtable_throughput_in_mb * 3 * number of hot CFs + 1G + internal caches

Although in a perfect world database systems would be clever enough to configure these thresholds themselves, what you cannot do is leave a system with a large number of column families with the defaults.

In our case, we had legacy defaults which meant memtable_operations_in_millions=0.75, memtable_throughput_in_mb=160 and memtable_flush_after_mins=60 across all our column families. Since many of our hot column families store small pieces of data, the total memory overhead of many memtables would have been 400MB or more. Memtables fill up when you are under load, exactly when you don’t want OOM to raise its head!!! Under load, our memtables wanted more memory than the heap could provide.

I found a special way of triggering the problem. I forgot to restart a node after a rolling upgrade. By the time I realized it was down, there were a storm of hinted writes waiting for it from the other nodes (writes that the other nodes saved on its behalf for later application because it was not available to receive them at the time, a bit like your neighbor holding a package from DHL for you that arrived when you were out). When the node came up, these writes stormed into the commit logs, and then into the memtables which quickly filled up. Total memory was quickly exhausted before they could be flushed, leading the node to crash with OOM!

The Black Art of Recovering from OOM Meltdown

If you have not done so you need to check your column family configurations now because the situation can be worse than a node simply dying with an OOM exception. You can find as we did with our badly misconfigured system that a node cannot be restarted! Very possibly, many of the writes that caused your node to crash will remain in your commit logs. When you bring the node backup, Cassandra will try to replay these logs filling up the memtables again and if you’re unlucky causing another OOM core dump!

If anyone reaches this situation, I hope for their sake they find this post!!! Caveat emptor: This is not advice, it merely reflects what we did to solve our problems. I am not in any way responsible for the consequences if this doesn’t work 😉 That said, this is what you do…

Firstly you need to bring your service *down* (yes, your *site* or *game* needs to be turned off). In our case we use RF 3 so only the performance of our service had suffered up until this point. But when you start messing with the commit logs, you need to make sure no new writes are occurring…

Once no new writes can take place, proceed as follows. Firstly increase the heap allocated to Cassandra to the maximum value your server will support (i.e. using the -Xmx JVM parameter). Try restarting Cassandra again. If you are successful, it should replay all the logs successfully and flush the memtables to disk. In this case, your original commit logs will be deleted and new ones created. You can now stop Cassandra, reset the heap size, and restart. You can then reconfigure your column family parameters, and hopefully be in the clear.

If on the other hand this does not work you need to do the following:

  1. Move your commit logs to a backup directory e.g. mv /var/opt/cassandra/commitlogs/* /var/opt/cassandra/logsbackup
  2. Connect to a working Cassandra node using cassandra-cli and reconfigure your column family parameters appropriately. Make sure the heap on your down node can supply your maximum memtable needs.
  3. Restart the Cassandra node without its commit logs. It will now receive the schema updates from the other nodes. Because it’s memtables will be empty, it won’t crash and will be able to reconfigure its own copy of the schema
  4. Shutdown the Cassandra node, copy the commit logs back and restart it again. Unfortunately, if you have a sufficiently large number of column families, you have not been conservative enough and your commit log backlog is large enough, you will bomb out with OOM again!
  5. Copy out all your commit logs. Then copy back a single commit log and restart. Once this log is successfully replayed and deleted, you should then shutdown the node and repeat the process with the next commit log (for the purposes of speed, Cassandra processes commit logs in parallel so minimum load is created when you process your commit logs one by one).

Hopefully you will now be feeling better, you will have fixed your broken node and you can restart your service. You will now be a wiser person and never forget to configure your column families properly 🙂

Written by dominicwilliams

February 8, 2011 at 8:13 pm

Posted in Uncategorized, our redirector

with 2 comments

Fight My Monster is a fantastic brand name, but is quite a handful for kids to type as a domain name ( What we don’t want is kids googling “Fight My monster” and then clicking on our own search ads (that could get expensive!!). The best solution: especially for usage with TV commercials, is get yourself a redirection domain, in our case!

Next step, start blogging about the domain so Google picks it up!.. if anyone wants to talk about our domain strategy, please feel free?? And link to this post 🙂

We’ve already registered all of the likely misspellings. Any other ideas?

Written by dominicwilliams

August 3, 2010 at 11:12 am

Posted in Fight My Monster

Tagged with , ,

Cassandra: Up and running quickly in Java using Pelops

with 52 comments


In Greek mythology Cassandra is captured by the triumphant king Agamemnon after the fall of Troy, with whom she has two sons, Pelops and Teledamus. This Java client library is Pelop’s namesake nicknamed “Cassandra’s beautiful son” because it offers a beautiful way to code against the Cassandra database. This is a quick introduction to the library.

You can find the open source code here


Pelops was born to improve the quality of Cassandra code across a complex commercial project that makes extensive use of the database. The main objectives the library are:

  • To faithfully expose Cassandra’s API in a manner that is immediately understandable to anyone:
    simple, but beautiful
  • To completely separate low-level concerns such as connection pooling from data processing code
  • To eliminate “dressing code”, so that the semantics of data processing stand clear and obvious
  • To accelerate development through intellisense, function overloading and powerful high-level methods
  • To implement strategies like load balancing based upon the per node running operation count
  • To include robust error handling and recovery that does not mask application-level logic problems
  • To track the latest Cassandra releases and features without causing breaking changes
  • To define a long-lasting paradigm for those writing client code

Up and running in 5 minutes

To start working with Pelops and Cassandra, you need to know three things:

  1. How to create a connection pool, typically once at startup
  2. How to write data using the Mutator class
  3. How to read data using the Selector class.

It’s that easy!

Creating a connection pool

To work with a Cassandra cluster, you need to start off by defining a connection pool. This is typically done once in the startup code of your application. Sometimes you will define more than one connection pool. For example, in our project, we use two Cassandra database clusters, one which uses random partitioning for data storage, and one which uses order preserving partitioning for indexes. You can create as many connection pools as you need.

To create a pool, you need to specify a name, a list of known contact nodes (the library can automatically detect further nodes in the cluster, but see notes at the end), the network port that the nodes are listening on, and a policy which controls things like the number of connections in your pool.

Here a pool is created with default policies:

    new String[] { "", "", ""},
    new Policy());

Using a Mutator

The Mutator class is used to make mutations to a keyspace (which in SQL speak translates as making changes to a database). You ask Pelops for a new mutator, and then specify the mutations you wish to make. These are sent to Cassandra in a single batch when you call its execute method.

To create a mutator, you must specify the name of the connection pool you will use and the name of the keyspace you wish to mutate. Note that the pool determines what database cluster you are talking to.

Mutator mutator = Pelops.createMutator("Main", "SupportTickets");

Once you have the mutator, you start specifying changes.

 * Write multiple sub-column values to a super column...
 * @param rowKey                    The key of the row to modify
 * @param colFamily                 The name of the super column family to operate on
 * @param colName                   The name of the super column
 * @param subColumns                A list of the sub-columns to write
mutator. writeSubColumns(
    UuidHelper.newTimeUuidBytes(), // using a UUID value that sorts by time
        mutator.newColumn("category", "videoPhone"),
        mutator.newColumn("reportType", "POOR_PICTURE"),
        mutator.newColumn("createdDate", NumberHelper.toBytes(System.currentTimeMillis())),
        mutator.newColumn("capture", jpegBytes),
        mutator.newColumn("comment") ));

 * Delete a list of columns or super columns...
 * @param rowKey                    The key of the row to modify
 * @param colFamily                 The name of the column family to operate on
 * @param colNames                  The column and/or super column names to delete

After specifying the changes, you send them to Cassandra in a single batch by calling execute. This takes the Cassandra consistency level as a parameter.


Note that if you need to know a particular mutation operation has completed successfully before initiating some subsequent operation, then you should not batch your mutations together. Since you cannot re-use a mutator after it has been executed, you should create two or more mutators, and execute them with at least a QUORUM consistency level.

Browse the Mutator class to see the methods and overloads that are available

Using a Selector

The Selector class is used to read data from a keyspace. You ask Pelops for a new selector, and then read data by calling its methods.

Selector selector = Pelops.createSelector("Main", "SupportTickets");

Once you have a selector instance, you can start reading data using its many overloads.

 * Retrieve a super column from a row...
 * @param rowKey                        The key of the row
 * @param columnFamily                  The name of the column family containing the super column
 * @param superColName                  The name of the super column to retrieve
 * @param cLevel                        The Cassandra consistency level with which to perform the operation
 * @return                              The requested SuperColumn
SuperColumn ticket = selector.getSuperColumnFromRow(

assert ticketId.equals(

// enumerate sub-columns
for (Column data : ticket.columns) {
    String name =;
    byte[] value = data.value;

 * Retrieve super columns from a row
 * @param rowKey                        The key of the row
 * @param columnFamily                  The name of the column family containing the super columns
 * @param colPredicate                  The super column selector predicate
 * @param cLevel                        The Cassandra consistency level with which to perform the operation
 * @return                              A list of matching columns
List<SuperColumn> allTickets = selector.getSuperColumnsFromRow(
    Selector.newColumnsPredicateAll(true, 10000),

 * Retrieve super columns from a set of rows.
 * @param rowKeys                        The keys of the rows
 * @param columnFamily                   The name of the column family containing the super columns
 * @param colPredicate                   The super column selector predicate
 * @param cLevel                         The Cassandra consistency level with which to perform the operation
 * @return                               A map from row keys to the matching lists of super columns
Map<String, List<SuperColumn>> allTicketsForFriends = selector.getSuperColumnsFromRows(
    Arrays.asList(new String[] { "matt", "james", "dom" }, // the friends
    Selector.newColumnsPredicateAll(true, 10000),

 * Retrieve a page of super columns composed from a segment of the sequence of super columns in a row.
 * @param rowKey                        The key of the row
 * @param columnFamily                  The name of the column family containing the super columns
 * @param startBeyondName               The sequence of super columns must begin with the smallest super column name greater than this value. Pass null to start at the beginning of the sequence.
 * @param orderType                     The scheme used to determine how the column names are ordered
 * @param reversed                      Whether the scan should proceed in descending super column name order
 * @param count                         The maximum number of super columns that can be retrieved by the scan
 * @param cLevel                        The Cassandra consistency level with which to perform the operation
 * @return                              A page of super columns
List<SuperColumn> pageTickets = getPageOfSuperColumnsFromRow(
    lastIdOfPrevPage, // null for first page
    Selector.OrderType.TimeUUIDType, // ordering defined in this super column family
    true, // blog order
    10, // count shown per page

There are a huge number of selector methods and overloads which expose the full power of Cassandra, and others like the paginator methods that make otherwise complex tasks simple. Browse the Selector class to see what is available here

Other stuff

All the main things you need to start using Pelops have been covered, and with your current knowledge you can easily feel your way around Pelops inside your IDE using intellisense. Some final points it will be useful to keep in mind if you want to work with Pelops:

  • If you need to perform deletions at the row key level, use an instance of the KeyDeletor class (call Pelops.createKeyDeletor).
  • If you need metrics from a Cassandra cluster, use an instance of the Metrics class (call Pelops.createMetrics).
  • To work with Time UUIDs, which are globally unique identifiers that can be sorted by time – which you will find to be very useful throughout your Cassandra code – use the UuidHelper class.
  • To work with numbers stored as binary values, use the NumberHelper class.
  • To work with strings stored as binary values, use the StringHelper class.
  • Methods in the Pelops library that cause interaction with Cassandra throw the standard
    Cassandra exceptions defined here.

The Pelops design secret

One of the key design decisions that at the time of writing distinguishes Pelops, is that the data processing code written by developers does not involve connection pooling or management. Instead, classes like Mutator and Selector borrow connections to Cassandra from a Pelops pool for just the periods that they need to read and write to the underlying Thrift API. This has two advantages.

Firstly, obviously, code becomes cleaner and developers are freed from connection management concerns. But also more subtly this enables the Pelops library to completely manage connection pooling itself, and for example keep track of how many outstanding operations are currently running against each cluster node.

This for example, enables Pelops to perform more effective client load balancing by ensuring that new operations are performed against the node to which it currently has the least outstanding operations running. Because of this architectural choice, it will even be possible to offer strategies in the future where for example nodes are actually queried to determine their load.

To see how the library abstracts connection pooling away from the semantics of data processing, take a look at the execute method of Mutator and the tryOperation method of Operand. This is the foundation upon which Pelops greatly improves over existing libraries that have modelled connection management on pre-existing SQL database client libraries.


That’s all. I hope you get the same benefits from Pelops that we did.

Written by dominicwilliams

June 11, 2010 at 12:31 pm