Chapter 5. The Cassandra Architecture

In this chapter, we examine several aspects of Cassandra’s internal design in order to understand how it does its job. We consider the peer-to-peer design and its corresponding gossip protocol, as well as what Cassandra does on read and write requests, and examine how these choices affect architectural considerations such as scalability, durability, availability, manageability, and more. We also discuss Cassandra’s adoption of a Staged Event-Driven Architecture, which acts as the platform for request delegation.

The Cassandra architecture is very sophisticated and relies on the use of several different theoretical constructs. It is hard to discuss any one new term without referencing other terms we probably also haven’t met yet. This can be frustrating, which is why I’ve included the Glossary in the back of the book for you to refer to.

System Keyspace

Cassandra has an internal keyspace called system that it uses to store metadata about the cluster to aid in operations. In Microsoft SQL Server, two meta-databases are maintained: master and tempdb. The master is used to keep information about disk space, usage, system settings, and general server installation notes; the tempdb is used as a workspace to store intermediate results and perform general tasks. The Oracle database always has a tablespace called SYSTEM, used for similar purposes. The Cassandra system keyspace is used much like these.

Specifically, the system keyspace stores metadata for the local node, as well as hinted handoff information. The metadata includes:

  • The node’s token

  • The cluster name

  • Keyspace and schema definitions to support dynamic loading

  • Migration data

  • Whether or not the node is bootstrapped

The schema definitions are stored in two column families: the Schema column family holds user keyspace and schema definitions, and the Migrations column family records the changes made to a keyspace.

You cannot modify the system keyspace.

Peer-to-Peer

In traditional databases that can be deployed on multiple machines (such as MySQL), and even in newer models such as Google’s Bigtable, some nodes are designated masters and some slaves. They have different roles in the overall cluster: the master acts as the authoritative source for data, and slaves synchronize their data to the master. Any changes written to the master are passed on to slaves. This model is optimized for reading data, as it allows data to be read from any slave. But the replication is one-way, from master to slave. This has an important ramification: all writes must be sent to the master, which means that it is a potential single point of failure. In a master/slave setup, the master node can have far-reaching effects if it goes offline.

By contrast, Cassandra has a peer-to-peer distribution model, such that any given node is structurally identical to any other node—that is, there is no “master” node that acts differently than a “slave” node. The aim of Cassandra’s design is overall system availability and ease of scaling. The peer-to-peer design can improve general database availability, because while taking any given Cassandra node offline may have a potential impact on overall throughput, it is a graceful degradation that does not interrupt service. Assuming that you are using a reasonable replication strategy, the data on a failed node will still be available for reads and writes.

This design also makes it easier to scale Cassandra by adding new nodes. Because the behavior of each node is identical, in order to add a new server, you simply need to add it to the cluster. The new node will not immediately accept requests so that it has time to learn the topology of the ring and accept data that it may also be responsible for. After it does this, it can join the ring as a full member and begin accepting requests. This is largely automatic and requires minimal configuration. For this reason, the P2P design makes both scaling up and scaling down an easier task than in master/slave replication.

Gossip and Failure Detection

To support decentralization and partition tolerance, Cassandra uses a gossip protocol for intra-ring communication so that each node can have state information about other nodes. The gossiper runs every second on a timer. Hinted handoff is triggered by gossip, when a node notices that a node it has hints for has just come back online. Anti-entropy, on the other hand, is a manual process; it is not triggered by gossip.

Gossip protocols (sometimes called “epidemic protocols”) generally assume a faulty network, are commonly employed in very large, decentralized network systems, and are often used as an automatic mechanism for replication in distributed databases. They take their name from the concept of human gossip, a form of communication in which peers can choose with whom they want to exchange information.

Note

The term “gossip protocol” was originally coined in 1987 by Alan Demers, a researcher at Xerox’s Palo Alto Research Center, who was studying ways to route information through unreliable networks.

The gossip protocol in Cassandra is primarily implemented by the org.apache.cassandra.gms.Gossiper class, which is responsible for managing gossip for the local node. When a server node is started, it registers itself with the gossiper to receive endpoint state information.

Because Cassandra gossip is used for failure detection, the Gossiper class maintains a list of nodes that are alive and dead.

Here is how the gossiper works:

  1. Periodically (according to the settings in its TimerTask), the G=gossiper will choose a random node in the ring and initialize a gossip session with it. Each round of gossip requires three messages.

  2. The gossip initiator sends its chosen friend a GossipDigestSynMessage.

  3. When the friend receives this message, it returns a GossipDigestAckMessage.

  4. When the initiator receives the ack message from the friend, it sends the friend a GossipDigestAck2Message to complete the round of gossip.

When the gossiper determines that another endpoint is dead, it “convicts” that endpoint by marking it as dead in its local list and logging that fact.

Cassandra has robust support for failure detection, as specified by a popular algorithm for distributed computing called Phi Accrual Failure Detection. This manner of failure detection originated at the Advanced Institute of Science and Technology in Japan in 2004.

Accrual failure detection is based on two primary ideas. The first general idea is that failure detection should be flexible, which is achieved by decoupling it from the application being monitored. The second and more novel idea challenges the notion of traditional failure detectors, which are implemented by simple “heartbeats” and decide whether a node is dead or not dead based on whether a heartbeat is received or not. But accrual failure detection decides that this approach is naive, and finds a place in between the extremes of dead and alive—a suspicion level.

Therefore, the failure monitoring system outputs a continuous level of “suspicion” regarding how confident it is that a node has failed. This is desirable because it can take into account fluctuations in the network environment. For example, just because one connection gets caught up doesn’t necessarily mean that the whole node is dead. So suspicion offers a more fluid and proactive indication of the weaker or stronger possibility of failure based on interpretation (the sampling of heartbeats), as opposed to a simple binary assessment.

Note

You can read the original Phi Accrual Failure Detection paper by Naohiro Hayashibara et al. at http://ddg.jaist.ac.jp/pub/HDY+04.pdf.

Failure detection is implemented in Cassandra by the org.apache.cassandra.gms.FailureDetector class, which implements the org.apache.cassandra.gms.IFailureDetector interface. Together, they allow the following operations:

isAlive(InetAddress)

What the detector will report about a given node’s alive-ness.

interpret(InetAddress)

Used by the gossiper to help it decide whether a node is alive or not based on suspicion level reached by calculating Phi (as described in the Hayashibara paper).

report(InetAddress)

When a node receives a heartbeat, it invokes this method.

Anti-Entropy and Read Repair

Where you find gossip protocols, you will often find their counterpart, anti-entropy, which is also based on an epidemic theory of computing. Anti-entropy is the replica synchronization mechanism in Cassandra for ensuring that data on different nodes is updated to the newest version.

Here’s how it works. During a major compaction, the server initiates a TreeRequest/TreeReponse conversation to exchange Merkle trees with neighboring nodes. The Merkle tree is a hash representing the data in that column family. If the trees from the different nodes don’t match, they have to be reconciled (or “repaired”) in order to determine the latest data values they should all be set to. This tree comparison validation is the responsibility of the org.apache.cassandra.service.AntiEntropyService class. AntiEntropyService implements the Singleton pattern and defines the static Differencer class as well, which is used to compare two trees; if it finds any differences, it launches a repair for the ranges that don’t agree.

Anti-entropy is used in Amazon’s Dynamo, and Cassandra’s implementation is modeled on that (see Section 4.7 of the Dynamo paper).

Dynamo uses a Merkle tree for anti-entropy (see the definition of Merkle tree in the Glossary). Cassandra uses them too, but the implementation is a little different. In Cassandra, each column family has its own Merkle tree; the tree is created as a snapshot during a major compaction operation (see “Compaction” in the Glossary), and is kept only as long as is required to send it to the neighboring nodes on the ring. The advantage of this implementation is that it reduces disk I/O.

After each update, the anti-entropy algorithm kicks in. This performs a checksum against the database and compares checksums of peers; if the checksums differ, then the data is exchanged. This requires using a time window to ensure that peers have had a chance to receive the most recent update so that the system is not constantly and unnecessarily executing anti-entropy. To keep the operation fast, nodes internally maintain an inverted index keyed by timestamp and only exchange the most recent updates.

In Cassandra, you have multiple nodes that make up your cluster, and one or more of the nodes act as replicas for a given piece of data. To read data, a client connects to any node in the cluster and, based on the consistency level specified by the client, a number of nodes are read. The read operation blocks until the client-specified consistency level is met. If it is detected that some of the nodes responded with an out-of-date value, Cassandra will return the most recent value to the client. After returning, Cassandra will perform what’s called a read repair in the background. This operation brings the replicas with stale values up to date.

This design is observed by Cassandra as well as by straight key/value stores such as Project Voldemort and Riak. It acts as a performance improvement because the client does not block until all nodes are read, but the read repair stage manages the task of keeping the data fresh in the background. If you have lots of clients, it’s important to read from a quorum of nodes in order to ensure that at least one will have the most recent value.

If the client specifies a weak consistency level (such as ONE), then the read repair is performed in the background after returning to the client. If you are using one of the two stronger consistency levels (QUORUM or ALL), then the read repair happens before data is returned to the client.

If a read operation shows different values stored for the same timestamp, Cassandra will compare the values directly as a tie-breaking mechanism to ensure that read repairing doesn’t enter an infinite loop. This case should be exceedingly rare.

Memtables, SSTables, and Commit Logs

When you perform a write operation, it’s immediately written to the commit log. The commit log is a crash-recovery mechanism that supports Cassandra’s durability goals. A write will not count as successful until it’s written to the commit log, to ensure that if a write operation does not make it to the in-memory store (the memtable, discussed in a moment), it will still be possible to recover the data.

After it’s written to the commit log, the value is written to a memory-resident data structure called the memtable. When the number of objects stored in the memtable reaches a threshold, the contents of the memtable are flushed to disk in a file called an SSTable. A new memtable is then created. This flushing is a nonblocking operation; multiple memtables may exist for a single column family, one current and the rest waiting to be flushed. They typically should not have to wait very long, as the node should flush them very quickly unless it is overloaded.

Each commit log maintains an internal bit flag to indicate whether it needs flushing. When a write operation is first received, it is written to the commit log and its bit flag is set to 1. There is only one bit flag per column family, because only one commit log is ever being written to across the entire server. All writes to all column families will go into the same commit log, so the bit flag indicates whether a particular commit log contains anything that hasn’t been flushed for a particular column family. Once the memtable has been properly flushed to disk, the corresponding commit log’s bit flag is set to 0, indicating that the commit log no longer has to maintain that data for durability purposes. Like regular logfiles, commit logs have a configurable rollover threshold, and once this file size threshold is reached, the log will roll over, carrying with it any extant dirty bit flags.

The SSTable is a concept borrowed from Google’s Bigtable. Once a memtable is flushed to disk as an SSTable, it is immutable and cannot be changed by the application. Despite the fact that SSTables are compacted, this compaction changes only their on-disk representation; it essentially performs the “merge” step of a mergesort into new files and removes the old files on success.

Note

The idea that “SSTable” is a compaction of “Sorted String Table” is somewhat of a misnomer for Cassandra, because the data is not stored as strings on disk.

Each SSTable also has an associated Bloom filter, which is used as an additional performance enhancer (see Bloom Filters).

All writes are sequential, which is the primary reason that writes perform so well in Cassandra. No reads or seeks of any kind are required for writing a value to Cassandra because all writes are append operations. This makes one key limitation on performance the speed of your disk. Compaction is intended to amortize the reorganization of data, but it uses sequential IO to do so. So the performance benefit is gained by splitting; the write operation is just an immediate append, and then compaction helps to organize for better future read performance. If Cassandra naively inserted values where they ultimately belonged, writing clients would pay for seeks up front.

On reads, Cassandra will check the memtable first to find the value. Memtables are implemented by the org.apache.cassandra.db.Memtable class.

Hinted Handoff

Consider the following scenario. A write request is sent to Cassandra, but the node where the write properly belongs is not available due to network partition, hardware failure, or some other reason. In order to ensure general availability of the ring in such a situation, Cassandra implements a feature called hinted handoff. You might think of a hint as a little post-it note that contains the information from the write request. If the node where the write belongs has failed, the Cassandra node that receives the write will create a hint, which is a small reminder that says, “I have the write information that is intended for node B. I’m going to hang onto this write, and I’ll notice when node B comes back online; when it does, I’ll send it the write request.” That is, node A will “hand off” to node B the “hint” regarding the write.

This allows Cassandra to be always available for writes, and reduces the time that a failed node will be inconsistent after it does come back online. We discussed consistency levels previously, and you may recall that consistency level ANY, which was added in 0.6, means that a hinted handoff alone will count as sufficient toward the success of a write operation. That is, even if only a hint was able to be recorded, the write still counts as successful.

Some concern about hinted handoffs has been voiced by members of the Cassandra community. At first, it seems like a thoughtful and elegant design to ensure overall durability of the database, and appears unproblematic because it is familiar from many distributed computing paradigms, such as Java Message Service (JMS). In a durable guaranteed-delivery JMS queue, if a message cannot be delivered to a receiver, JMS will wait for a given interval and then resend the request until the message is received. But there is a practical problem with both guaranteed delivery in JMS and Cassandra’s hinted handoffs: if a node is offline for some time, the hints can build up considerably on other nodes. Then, when the other nodes notice that the failed node has come back online, they tend to flood that node with requests, just at the moment it is most vulnerable (when it is struggling to come back into play after a failure).

In response to these concerns, it is now possible to disable hinted handoff entirely, or, as a less extreme measure, reduce the priority of hinted handoff messages against new write requests.

Note

In Cassandra 0.6 and earlier, HintedHandoffManager.sendMessage would read an entire row into memory, and then send the row back to the client in a single message. As of version 0.7, Cassandra will now page within a single hinted row instead. This can improve performance against very wide rows.

Compaction

A compaction operation in Cassandra is performed in order to merge SSTables. During compaction, the data in SSTables is merged: the keys are merged, columns are combined, tombstones are discarded, and a new index is created.

Compaction is the process of freeing up space by merging large accumulated datafiles. This is roughly analogous to rebuilding a table in the relational world. But as Stu Hood points out, the primary difference in Cassandra is that it is intended as a transparent operation that is amortized across the life of the server.

On compaction, the merged data is sorted, a new index is created over the sorted data, and the freshly merged, sorted, and indexed data is written to a single new SSTable (each SSTable consists of three files: Data, Index, and Filter). This process is managed by the class org.apache.cassandra.db.CompactionManager. CompactionManager implements an MBean interface so it can be introspected.

Another important function of compaction is to improve performance by reducing the number of required seeks. There are a bounded number of SSTables to inspect to find the column data for a given key. If a key is frequently mutated, it’s very likely that the mutations will all end up in flushed SSTables. Compacting them prevents the database from having to perform a seek to pull the data from each SSTable.

There are different types of compaction in Cassandra. A major compaction is triggered one of two ways: via a node probe or automatically. A node probe sends a TreeRequest message to the nodes that neighbor the target. When a node receives a TreeRequest, it immediately performs a read-only compaction in order to validate the column family.

A read-only compaction has the following steps:

  1. Get the key distribution from the column family.

  2. Once the rows have been added to the validator, if the column family needs to be validated, it will create the Merkle tree and broadcast it to the neighboring nodes.

  3. The Merkle trees are brought together in a “rendezvous” as a list of Differencers (trees that need validating or comparison).

  4. The comparison is executed by the StageManager class, which is responsible for handling concurrency issues in executing jobs. In this case, the StageManager uses an Anti-Entropy Stage. This uses the org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor class, which executes the compaction within a single thread and makes the operation available as an MBean for inspection.

You can increase overall performance by reducing the priority of compaction threads. To do so, use the following flag:

-Dcassandra.compaction.priority=1

This will affect CPU usage, not IO.

Bloom Filters

Bloom filters are used as a performance booster. They are named for their inventor, Burton Bloom. Bloom filters are very fast, nondeterministic algorithms for testing whether an element is a member of a set. They are nondeterministic because it is possible to get a false-positive read from a Bloom filter, but not a false-negative. Bloom filters work by mapping the values in a data set into a bit array and condensing a larger data set into a digest string. The digest, by definition, uses a much smaller amount of memory than the original data would. The filters are stored in memory and are used to improve performance by reducing disk access on key lookups. Disk access is typically much slower than memory access. So, in a way, a Bloom filter is a special kind of cache. When a query is performed, the Bloom filter is checked first before accessing disk. Because false-negatives are not possible, if the filter indicates that the element does not exist in the set, it certainly doesn’t; but if the filter thinks that the element is in the set, the disk is accessed to make sure.

A new JMX MBean feature will be added to Nodetool that allows you to check the number of false-positives that your Bloom filters are returning; this operation is called getBloomFilterFalsePositives.

Note

Apache Hadoop, Google Bigtable, and Squid Proxy Cache also employ Bloom filters.

Tombstones

In the relational world, you might be used to the idea of a “soft delete.” Instead of actually executing a delete SQL statement, the application will issue an update statement that changes a value in a column called something like “deleted”. Programmers sometimes do this to support audit trails, for example.

There’s a similar concept in Cassandra called a tombstone. This is how all deletes work and is therefore automatically handled for you. When you execute a delete operation, the data is not immediately deleted. Instead, it’s treated as an update operation that places a tombstone on the value. A tombstone is a deletion marker that is required to suppress older data in SSTables until compaction can run.

There’s a related setting called Garbage Collection Grace Seconds. This is the amount of time that the server will wait to garbage-collect a tombstone. By default, it’s set to 864,000 seconds, the equivalent of 10 days. Cassandra keeps track of tombstone age, and once a tombstone is older than GCGraceSeconds, it will be garbage-collected. The purpose of this delay is to give a node that is unavailable time to recover; if a node is down longer than this value, then it is treated as failed and replaced.

As of 0.7, this setting is configurable per column family (it used to be for the whole keyspace).

Staged Event-Driven Architecture (SEDA)

Cassandra implements a Staged Event-Driven Architecture (SEDA). SEDA is a general architecture for highly concurrent Internet services, originally proposed in a 2001 paper called “SEDA: An Architecture for Well-Conditioned, Scalable Internet Services” by Matt Welsh, David Culler, and Eric Brewer (who you might recall from our discussion of the CAP theorem).

Note

You can read the original SEDA paper at http://www.eecs.harvard.edu/~mdw/proj/seda.

In a typical application, a single unit of work is often performed within the confines of a single thread. A write operation, for example, will start and end within the same thread. Cassandra, however, is different: its concurrency model is based on SEDA, so a single operation may start with one thread, which then hands off the work to another thread, which may hand it off to other threads. But it’s not up to the current thread to hand off the work to another thread. Instead, work is subdivided into what are called stages, and the thread pool (really, a java.util.concurrent.ExecutorService) associated with the stage determines execution. A stage is a basic unit of work, and a single operation may internally state-transition from one stage to the next. Because each stage can be handled by a different thread pool, Cassandra experiences a massive performance improvement. This SEDA design also means that Cassandra is better able to manage its own resources internally because different operations might require disk IO, or they might be CPU-bound, or they might be network operations, and so on, so the pools can manage their work according to the availability of these resources.

A stage consists of an incoming event queue, an event handler, and an associated thread pool. Stages are managed by a controller that determines scheduling and thread allocation; Cassandra implements this kind of concurrency model using the thread pool java.util.concurrent.ExecutorService. To see specifically how this works, check out the org.apache.cassandra.concurrent.StageManager class.

The following operations are represented as stages in Cassandra:

  • Read

  • Mutation

  • Gossip

  • Response

  • Anti-Entropy

  • Load Balance

  • Migration

  • Streaming

A few additional operations are also implemented as stages. There are stages for units of work performed on memtables (in the ColumnFamilyStore class), and the Consistency Manager is a stage in the StorageService.

The stages implement the IVerbHandler interface to support the functionality for a given verb. Because the idea of mutation is represented as a stage, it can play a role in both insert and delete operations.

SEDA is a powerful architecture. Because it is event-driven, as its name states, work can be performed with exceptional concurrency and minimal coupling.

Managers and Services

There is a set of classes that form Cassandra’s basic internal control mechanisms. I’ll present a brief overview of these here so that you can become familiar with some of the more important ones. The first one to consider is probably the org.apache.cassandra.thrift.CassandraServer class. This class implements the calls to the Thrift interface, and delegates most of the efforts of performing queries to org.apache.cassandra.service.StorageProxy.

Cassandra Daemon

The org.apache.cassandra.service.CassandraDaemon interface represents the life cycle of the Cassandra service running on a single node. It includes the typical life cycle operations that you might expect: start, stop, activate, deactivate, and destroy.

Storage Service

The Cassandra database service is represented by org.apache.cassandra.service.StorageService. The storage service contains the node’s token, which is a marker indicating the range of data that the node is responsible for.

The server starts up with a call to the initServer method of this class, upon which the server registers the SEDA verb handlers, makes some determinations about its state (such as whether it was bootstrapped or not, and what its partitioner is), and registers itself as an MBean with the JMX server.

Messaging Service

The purpose of org.apache.cassandra.net.MessagingService is to create socket listeners for message exchange; inbound and outbound messages from this node come through this service. The MessagingService.listen method creates a thread. Each incoming connection then dips into the ExecutorService thread pool using org.apache.cassandra.net.IncomingTcpConnection (a class that extends Thread) to deserialize the message. The message is validated, and then it’s determined whether this is a streaming message or not. Message streaming is Cassandra’s optimized way of sending sections of SSTable files from one node to another; all other communication between nodes occurs via serialized messages. If it’s streaming, the message is passed to an IncomingStreamReader; if it’s not streaming, the message is handled by MessagingService’s deserialization executor, which is handed the message in the form of a task that implements Runnable. Because this service also makes heavy use of stages and the pool it maintains is wrapped with an MBean, you can find out a lot about how this service is working (whether reads are getting backed up and so forth) through JMX.

Hinted Handoff Manager

As its name suggests, org.apache.cassandra.db.HintedHandoffManager is the class that manages hinted handoffs internally. To do so it maintains a thread pool, which is available for JMX monitoring as HINTED-HANDOFF-POOL.

Summary

In this chapter, we examined the main pillars of Cassandra’s construction, including gossip, anti-entropy, accrual failure detection, and how the use of a Staged Event-Driven Architecture maximizes performance. We also looked at how Cassandra internally executes various operations, such as tombstones and read repair. Finally, we surveyed some of the major classes and interfaces, pointing out key points of interest in case you want to dive deeper into the code base.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset