In this chapter, we examine several aspects of Cassandra’s internal design in order to understand how it does its job. We consider the peer-to-peer design and its corresponding gossip protocol, as well as what Cassandra does on read and write requests, and examine how these choices affect architectural considerations such as scalability, durability, availability, manageability, and more. We also discuss Cassandra’s adoption of a Staged Event-Driven Architecture, which acts as the platform for request delegation.
The Cassandra architecture is very sophisticated and relies on the use of several different theoretical constructs. It is hard to discuss any one new term without referencing other terms we probably also haven’t met yet. This can be frustrating, which is why I’ve included the Glossary in the back of the book for you to refer to.
Cassandra has an internal keyspace called system
that
it uses to store metadata about the cluster to aid in operations. In
Microsoft SQL Server, two meta-databases are maintained:
master
and tempdb
. The master
is
used to keep information about disk space, usage, system settings, and
general server installation notes; the tempdb
is used as a
workspace to store intermediate results and perform general tasks. The
Oracle database always has a tablespace called SYSTEM
, used
for similar purposes. The Cassandra system keyspace is used much like
these.
Specifically, the system keyspace stores metadata for the local node, as well as hinted handoff information. The metadata includes:
The node’s token
The cluster name
Keyspace and schema definitions to support dynamic loading
Migration data
Whether or not the node is bootstrapped
The schema definitions are stored in two column families: the
Schema
column family holds user keyspace and schema
definitions, and the Migrations
column family records the
changes made to a keyspace.
You cannot modify the system keyspace.
In traditional databases that can be deployed on multiple machines (such as MySQL), and even in newer models such as Google’s Bigtable, some nodes are designated masters and some slaves. They have different roles in the overall cluster: the master acts as the authoritative source for data, and slaves synchronize their data to the master. Any changes written to the master are passed on to slaves. This model is optimized for reading data, as it allows data to be read from any slave. But the replication is one-way, from master to slave. This has an important ramification: all writes must be sent to the master, which means that it is a potential single point of failure. In a master/slave setup, the master node can have far-reaching effects if it goes offline.
By contrast, Cassandra has a peer-to-peer distribution model, such that any given node is structurally identical to any other node—that is, there is no “master” node that acts differently than a “slave” node. The aim of Cassandra’s design is overall system availability and ease of scaling. The peer-to-peer design can improve general database availability, because while taking any given Cassandra node offline may have a potential impact on overall throughput, it is a graceful degradation that does not interrupt service. Assuming that you are using a reasonable replication strategy, the data on a failed node will still be available for reads and writes.
This design also makes it easier to scale Cassandra by adding new nodes. Because the behavior of each node is identical, in order to add a new server, you simply need to add it to the cluster. The new node will not immediately accept requests so that it has time to learn the topology of the ring and accept data that it may also be responsible for. After it does this, it can join the ring as a full member and begin accepting requests. This is largely automatic and requires minimal configuration. For this reason, the P2P design makes both scaling up and scaling down an easier task than in master/slave replication.
To support decentralization and partition tolerance, Cassandra uses a gossip protocol for intra-ring communication so that each node can have state information about other nodes. The gossiper runs every second on a timer. Hinted handoff is triggered by gossip, when a node notices that a node it has hints for has just come back online. Anti-entropy, on the other hand, is a manual process; it is not triggered by gossip.
Gossip protocols (sometimes called “epidemic protocols”) generally assume a faulty network, are commonly employed in very large, decentralized network systems, and are often used as an automatic mechanism for replication in distributed databases. They take their name from the concept of human gossip, a form of communication in which peers can choose with whom they want to exchange information.
The term “gossip protocol” was originally coined in 1987 by Alan Demers, a researcher at Xerox’s Palo Alto Research Center, who was studying ways to route information through unreliable networks.
The gossip protocol in Cassandra is
primarily implemented by the
org.apache
.cassandra.gms.Gossiper
class,
which is responsible for managing gossip for the local node. When
a server node is started, it registers itself with the gossiper to receive
endpoint state information.
Because Cassandra gossip is used for failure detection, the
Gossiper
class maintains a list of nodes that are alive and
dead.
Here is how the gossiper works:
Periodically (according to the settings in its
TimerTask
), the G=gossiper will choose a random
node in the ring and initialize a gossip session with it. Each round
of gossip requires three messages.
The gossip initiator sends its chosen friend a
GossipDigestSynMessage
.
When the friend receives this message, it returns a
GossipDigestAckMessage
.
When the initiator receives the ack
message from
the friend, it sends the friend a
GossipDigestAck2Message
to complete the round
of gossip.
When the gossiper determines that another endpoint is dead, it “convicts” that endpoint by marking it as dead in its local list and logging that fact.
Cassandra has robust support for failure detection, as specified by a popular algorithm for distributed computing called Phi Accrual Failure Detection. This manner of failure detection originated at the Advanced Institute of Science and Technology in Japan in 2004.
Accrual failure detection is based on two primary ideas. The first general idea is that failure detection should be flexible, which is achieved by decoupling it from the application being monitored. The second and more novel idea challenges the notion of traditional failure detectors, which are implemented by simple “heartbeats” and decide whether a node is dead or not dead based on whether a heartbeat is received or not. But accrual failure detection decides that this approach is naive, and finds a place in between the extremes of dead and alive—a suspicion level.
Therefore, the failure monitoring system outputs a continuous level of “suspicion” regarding how confident it is that a node has failed. This is desirable because it can take into account fluctuations in the network environment. For example, just because one connection gets caught up doesn’t necessarily mean that the whole node is dead. So suspicion offers a more fluid and proactive indication of the weaker or stronger possibility of failure based on interpretation (the sampling of heartbeats), as opposed to a simple binary assessment.
You can read the original Phi Accrual Failure Detection paper by Naohiro Hayashibara et al. at http://ddg.jaist.ac.jp/pub/HDY+04.pdf.
Failure detection is implemented in
Cassandra by the
org.apache.cassandra.gms
.FailureDetector
class, which
implements the
org.apache.cassandra.gms.IFailure
Detector
interface. Together,
they allow the following operations:
Where you find gossip protocols, you will often find their counterpart, anti-entropy, which is also based on an epidemic theory of computing. Anti-entropy is the replica synchronization mechanism in Cassandra for ensuring that data on different nodes is updated to the newest version.
Here’s how it works. During a major compaction, the server initiates
a TreeRequest/TreeReponse conversation to exchange Merkle trees with
neighboring nodes. The Merkle tree is a hash representing the data in that
column family. If the trees from the different nodes don’t match, they
have to be reconciled (or “repaired”) in order to determine the latest
data values they should all be set to. This tree comparison validation is
the responsibility of the
org.apache.cassandra.service.AntiEntropyService
class. AntiEntropyService
implements the Singleton
pattern and defines the static Differencer
class as
well, which is used to compare two trees; if it finds any differences, it
launches a repair for the ranges that don’t agree.
Anti-entropy is used in Amazon’s Dynamo, and Cassandra’s implementation is modeled on that (see Section 4.7 of the Dynamo paper).
Dynamo uses a Merkle tree for anti-entropy (see the definition of Merkle tree in the Glossary). Cassandra uses them too, but the implementation is a little different. In Cassandra, each column family has its own Merkle tree; the tree is created as a snapshot during a major compaction operation (see “Compaction” in the Glossary), and is kept only as long as is required to send it to the neighboring nodes on the ring. The advantage of this implementation is that it reduces disk I/O.
After each update, the anti-entropy algorithm kicks in. This performs a checksum against the database and compares checksums of peers; if the checksums differ, then the data is exchanged. This requires using a time window to ensure that peers have had a chance to receive the most recent update so that the system is not constantly and unnecessarily executing anti-entropy. To keep the operation fast, nodes internally maintain an inverted index keyed by timestamp and only exchange the most recent updates.
In Cassandra, you have multiple nodes that make up your cluster, and one or more of the nodes act as replicas for a given piece of data. To read data, a client connects to any node in the cluster and, based on the consistency level specified by the client, a number of nodes are read. The read operation blocks until the client-specified consistency level is met. If it is detected that some of the nodes responded with an out-of-date value, Cassandra will return the most recent value to the client. After returning, Cassandra will perform what’s called a read repair in the background. This operation brings the replicas with stale values up to date.
This design is observed by Cassandra as well as by straight key/value stores such as Project Voldemort and Riak. It acts as a performance improvement because the client does not block until all nodes are read, but the read repair stage manages the task of keeping the data fresh in the background. If you have lots of clients, it’s important to read from a quorum of nodes in order to ensure that at least one will have the most recent value.
If the client specifies a weak consistency level (such as
ONE
), then the read repair is performed in the background
after returning to the client. If you are using one of the two stronger
consistency levels (QUORUM
or ALL
), then the
read repair happens before data is returned to the
client.
If a read operation shows different values stored for the same timestamp, Cassandra will compare the values directly as a tie-breaking mechanism to ensure that read repairing doesn’t enter an infinite loop. This case should be exceedingly rare.
When you perform a write operation, it’s immediately written to the commit log. The commit log is a crash-recovery mechanism that supports Cassandra’s durability goals. A write will not count as successful until it’s written to the commit log, to ensure that if a write operation does not make it to the in-memory store (the memtable, discussed in a moment), it will still be possible to recover the data.
After it’s written to the commit log, the value is written to a memory-resident data structure called the memtable. When the number of objects stored in the memtable reaches a threshold, the contents of the memtable are flushed to disk in a file called an SSTable. A new memtable is then created. This flushing is a nonblocking operation; multiple memtables may exist for a single column family, one current and the rest waiting to be flushed. They typically should not have to wait very long, as the node should flush them very quickly unless it is overloaded.
Each commit log maintains an internal bit flag to indicate whether
it needs flushing. When a write operation is first received, it is written
to the commit log and its bit flag is set to 1
. There is only
one bit flag per column family, because only one commit log is ever being
written to across the entire server. All writes to all column families
will go into the same commit log, so the bit flag indicates whether a
particular commit log contains anything that hasn’t been flushed for a
particular column family. Once the memtable has been properly flushed to
disk, the corresponding commit log’s bit flag is set to 0
,
indicating that the commit log no longer has to maintain that data for
durability purposes. Like regular
logfiles, commit logs have a configurable rollover threshold, and once
this file size threshold is reached, the log will roll over, carrying with
it any extant dirty bit flags.
The SSTable is a concept borrowed from Google’s Bigtable. Once a memtable is flushed to disk as an SSTable, it is immutable and cannot be changed by the application. Despite the fact that SSTables are compacted, this compaction changes only their on-disk representation; it essentially performs the “merge” step of a mergesort into new files and removes the old files on success.
The idea that “SSTable” is a compaction of “Sorted String Table” is somewhat of a misnomer for Cassandra, because the data is not stored as strings on disk.
Each SSTable also has an associated Bloom filter, which is used as an additional performance enhancer (see Bloom Filters).
All writes are sequential, which is the primary reason that writes perform so well in Cassandra. No reads or seeks of any kind are required for writing a value to Cassandra because all writes are append operations. This makes one key limitation on performance the speed of your disk. Compaction is intended to amortize the reorganization of data, but it uses sequential IO to do so. So the performance benefit is gained by splitting; the write operation is just an immediate append, and then compaction helps to organize for better future read performance. If Cassandra naively inserted values where they ultimately belonged, writing clients would pay for seeks up front.
On reads, Cassandra will check the memtable first to find the value.
Memtables are implemented by the
org.apache.cassandra.db.Memtable
class.
Consider the following scenario. A write request is sent to Cassandra, but the node where the write properly belongs is not available due to network partition, hardware failure, or some other reason. In order to ensure general availability of the ring in such a situation, Cassandra implements a feature called hinted handoff. You might think of a hint as a little post-it note that contains the information from the write request. If the node where the write belongs has failed, the Cassandra node that receives the write will create a hint, which is a small reminder that says, “I have the write information that is intended for node B. I’m going to hang onto this write, and I’ll notice when node B comes back online; when it does, I’ll send it the write request.” That is, node A will “hand off” to node B the “hint” regarding the write.
This allows Cassandra to be always available for writes, and reduces
the time that a failed node will be inconsistent after it does come back
online. We discussed consistency levels previously, and you may recall
that consistency level ANY
, which was added in 0.6, means
that a hinted handoff alone will count as sufficient toward the success of
a write operation. That is, even if only a hint was able to be recorded,
the write still counts as successful.
Some concern about hinted handoffs has been voiced by members of the Cassandra community. At first, it seems like a thoughtful and elegant design to ensure overall durability of the database, and appears unproblematic because it is familiar from many distributed computing paradigms, such as Java Message Service (JMS). In a durable guaranteed-delivery JMS queue, if a message cannot be delivered to a receiver, JMS will wait for a given interval and then resend the request until the message is received. But there is a practical problem with both guaranteed delivery in JMS and Cassandra’s hinted handoffs: if a node is offline for some time, the hints can build up considerably on other nodes. Then, when the other nodes notice that the failed node has come back online, they tend to flood that node with requests, just at the moment it is most vulnerable (when it is struggling to come back into play after a failure).
In response to these concerns, it is now possible to disable hinted handoff entirely, or, as a less extreme measure, reduce the priority of hinted handoff messages against new write requests.
In Cassandra 0.6 and earlier,
HintedHandoffManager.sendMessage
would read an entire row
into memory, and then send the row back to the client in a single
message. As of version 0.7, Cassandra will now page within a single
hinted row instead. This can improve performance against very wide
rows.
A compaction operation in Cassandra is performed in order to merge SSTables. During compaction, the data in SSTables is merged: the keys are merged, columns are combined, tombstones are discarded, and a new index is created.
Compaction is the process of freeing up space by merging large accumulated datafiles. This is roughly analogous to rebuilding a table in the relational world. But as Stu Hood points out, the primary difference in Cassandra is that it is intended as a transparent operation that is amortized across the life of the server.
On compaction, the merged data is sorted, a new index is created
over the sorted data, and the freshly merged, sorted, and indexed data is
written to a single new SSTable (each SSTable consists of three files:
Data, Index, and
Filter). This process is managed by the class
org.apache.cassandra.db.CompactionManager
.
CompactionManager
implements an MBean interface so
it can be introspected.
Another important function of compaction is to improve performance by reducing the number of required seeks. There are a bounded number of SSTables to inspect to find the column data for a given key. If a key is frequently mutated, it’s very likely that the mutations will all end up in flushed SSTables. Compacting them prevents the database from having to perform a seek to pull the data from each SSTable.
There are different types of compaction in Cassandra. A major compaction is triggered one of two ways: via a node probe or automatically. A node probe sends a TreeRequest message to the nodes that neighbor the target. When a node receives a TreeRequest, it immediately performs a read-only compaction in order to validate the column family.
A read-only compaction has the following steps:
Get the key distribution from the column family.
Once the rows have been added to the validator, if the column family needs to be validated, it will create the Merkle tree and broadcast it to the neighboring nodes.
The Merkle trees are brought together in a “rendezvous” as a
list of Differencers
(trees that need validating or
comparison).
The comparison is executed by the
StageManager
class, which is responsible for
handling concurrency issues in executing jobs. In this case, the
StageManager
uses an Anti-Entropy Stage. This uses the
org.apache.cassandra.concurrent.JMXEnabled
ThreadPoolExecutor
class,
which executes the compaction within a single thread and
makes the operation available as an MBean for inspection.
You can increase overall performance by reducing the priority of compaction threads. To do so, use the following flag:
-Dcassandra.compaction.priority=1
This will affect CPU usage, not IO.
Bloom filters are used as a performance booster. They are named for their inventor, Burton Bloom. Bloom filters are very fast, nondeterministic algorithms for testing whether an element is a member of a set. They are nondeterministic because it is possible to get a false-positive read from a Bloom filter, but not a false-negative. Bloom filters work by mapping the values in a data set into a bit array and condensing a larger data set into a digest string. The digest, by definition, uses a much smaller amount of memory than the original data would. The filters are stored in memory and are used to improve performance by reducing disk access on key lookups. Disk access is typically much slower than memory access. So, in a way, a Bloom filter is a special kind of cache. When a query is performed, the Bloom filter is checked first before accessing disk. Because false-negatives are not possible, if the filter indicates that the element does not exist in the set, it certainly doesn’t; but if the filter thinks that the element is in the set, the disk is accessed to make sure.
A new JMX MBean feature will be added to Nodetool that allows you to
check the number of false-positives that your Bloom filters are returning;
this operation is called
getBloomFilterFalsePositives
.
Apache Hadoop, Google Bigtable, and Squid Proxy Cache also employ Bloom filters.
In the relational world, you might be used to the idea of a “soft delete.” Instead of actually executing a delete SQL statement, the application will issue an update statement that changes a value in a column called something like “deleted”. Programmers sometimes do this to support audit trails, for example.
There’s a similar concept in Cassandra called a tombstone. This is how all deletes work and is therefore automatically handled for you. When you execute a delete operation, the data is not immediately deleted. Instead, it’s treated as an update operation that places a tombstone on the value. A tombstone is a deletion marker that is required to suppress older data in SSTables until compaction can run.
There’s a related setting called Garbage Collection Grace Seconds.
This is the amount of time that the server will wait to garbage-collect a
tombstone. By default, it’s set to 864,000 seconds, the equivalent of 10
days. Cassandra keeps track of tombstone age, and once a tombstone is
older than GCGraceSeconds
, it will be garbage-collected. The
purpose of this delay is to give a node that is unavailable time to
recover; if a node is down longer than this value, then it is treated as
failed and replaced.
As of 0.7, this setting is configurable per column family (it used to be for the whole keyspace).
Cassandra implements a Staged Event-Driven Architecture (SEDA). SEDA is a general architecture for highly concurrent Internet services, originally proposed in a 2001 paper called “SEDA: An Architecture for Well-Conditioned, Scalable Internet Services” by Matt Welsh, David Culler, and Eric Brewer (who you might recall from our discussion of the CAP theorem).
You can read the original SEDA paper at http://www.eecs.harvard.edu/~mdw/proj/seda.
In a typical application, a single unit of work is often performed
within the confines of a single thread. A write operation, for example,
will start and end within the same thread. Cassandra, however, is
different: its concurrency model is based on SEDA, so a single operation
may start with one thread, which then hands off the work to another
thread, which may hand it off to other threads. But it’s not up to the
current thread to hand off the work to another thread. Instead, work is
subdivided into what are called stages, and the
thread pool (really, a
java.util.concurrent.ExecutorService
) associated
with the stage determines execution. A stage is a basic unit of work, and
a single operation may internally state-transition from one stage to the
next. Because each stage can be handled by a different thread pool,
Cassandra experiences a massive performance improvement. This SEDA design
also means that Cassandra is better able to manage its own resources internally because
different operations might require disk IO, or they might be CPU-bound, or
they might be network operations, and so on, so the pools can manage their
work according to the availability of these resources.
A stage consists of an incoming event queue, an event handler, and
an associated thread pool. Stages
are managed by a controller that determines scheduling and thread
allocation; Cassandra implements this kind of concurrency model using the
thread pool java.util.concurrent.ExecutorService
.
To see specifically how this works, check out the
org.apache.cassandra.concurrent.StageManager
class.
The following operations are represented as stages in Cassandra:
Read
Mutation
Gossip
Response
Anti-Entropy
Load Balance
Migration
Streaming
A few additional operations are also implemented as stages. There
are stages for units of work performed on memtables (in the
ColumnFamilyStore
class), and the Consistency
Manager is a stage in the StorageService
.
The stages implement the IVerbHandler
interface to support the functionality for a given verb. Because the idea of mutation is
represented as a stage, it can play a role in both insert and delete
operations.
SEDA is a powerful architecture. Because it is event-driven, as its name states, work can be performed with exceptional concurrency and minimal coupling.
There is a set of classes that form Cassandra’s basic internal
control mechanisms. I’ll present a brief overview of these here so that
you can become familiar with some of the more
important ones. The first one to consider is probably the
org.apache.cassandra
.thrift.CassandraServer
class.
This class implements the calls to the Thrift interface, and delegates most of the efforts of performing
queries to org.apache.cassandra
.service.StorageProxy
.
The
org.apache.cassandra.service.CassandraDaemon
interface represents the life cycle of the Cassandra service running on
a single node. It includes the typical life cycle operations that you
might expect: start
,
stop
, activate
,
deactivate
, and
destroy
.
The Cassandra database service is
represented by
org.apache.cassandra.service
.StorageService
. The storage
service contains the node’s token, which is a marker indicating
the range of data that the node is responsible for.
The server starts up with a call to the
initServer
method of this class, upon which the
server registers the SEDA verb handlers, makes some determinations about
its state (such as whether it was bootstrapped or not, and what its
partitioner is), and registers itself as an MBean with the JMX
server.
The purpose of
org.apache.cassandra.net.MessagingService
is to
create socket listeners for message exchange; inbound and outbound
messages from this node come through this service. The
MessagingService.listen
method creates a
thread. Each incoming connection
then dips into the ExecutorService
thread pool
using
org.apache.cassandra.net.IncomingTcpConnection
(a
class that extends Thread
) to deserialize the
message. The message is validated, and then it’s determined whether this
is a streaming message or not. Message streaming is Cassandra’s
optimized way of sending sections of SSTable files from one node to
another; all other communication between nodes occurs via serialized
messages. If it’s streaming, the message is passed to an
IncomingStreamReader
; if it’s not streaming, the
message is handled by MessagingService
’s
deserialization executor, which is handed the message in the form of a
task that implements Runnable
. Because this
service also makes heavy use of stages and the pool it maintains is
wrapped with an MBean, you can find out a lot about how this service is
working (whether reads are getting backed up and so forth) through
JMX.
In this chapter, we examined the main pillars of Cassandra’s construction, including gossip, anti-entropy, accrual failure detection, and how the use of a Staged Event-Driven Architecture maximizes performance. We also looked at how Cassandra internally executes various operations, such as tombstones and read repair. Finally, we surveyed some of the major classes and interfaces, pointing out key points of interest in case you want to dive deeper into the code base.