Now that we understand the data model, we’ll look at the different kinds of queries you can perform in Cassandra to read and write data. In this chapter, we use Cassandra 0.6.7-beta1, which is the most recent release version at the time of this writing.
There are several differences between Cassandra’s model and query methods and what’s available in RDBMS, and these are important to keep in mind.
There is no first-order concept of an update in Cassandra, meaning that there is no client query called an “update.” You can readily achieve the same effect, however, by simply performing an insert using an existing row key. If you issue an insert statement for a key that already exists, Cassandra will overwrite the values for any matching columns; if your query contains additional columns that don’t already exist for that row key, then the additional columns will be inserted. This is all seamless.
Cassandra automatically gives you record-level atomicity on every write operation. In RDBMS, you would have to specify row-level locking. Although Cassandra offers atomicity at the column family level, it does not guarantee isolation.
Because you need to denormalize tables to create secondary indexes, you might need to insert data into two or more tables (one for your primary table and one for your inverted or secondary index). This means issuing two insert statements. So if it is applicable for your use case, you’ll need to manually “roll back” writes by issuing a delete if one of the insert operations fails.
More to the point, perhaps, is that Cassandra doesn’t have transactions, because it just wasn’t built with that goal in mind.
It is possible in SQL databases to insert more than one row with identical values if you have not defined a unique primary key constraint on one of the columns. This is not possible in Cassandra. If you write a new record with a key that already exists in a column family, the values for any existing columns will be overwritten, and any columns that previously were not present for that row will be added to the row.
There are a few basic properties of Cassandra’s write ability that are worth noting. First, writing data is very fast in Cassandra, because its design does not require performing disk reads or seeks. The memtables and SSTables save Cassandra from having to perform these operations on writes, which slow down many databases. All writes in Cassandra are append-only.
Because of the database commit log and hinted handoff design, the database is always writeable, and within a column family, writes are always atomic.
Cassandra’s tuneable consistency levels mean that you can specify in your queries how much consistency you require. A higher consistency level means that more nodes need to respond to the query, giving you more assurance that the values present on each replica are the same. If two nodes respond with different timestamps, the newest value wins, and that’s what will be returned to the client. In the background, Cassandra will then perform what’s called a read repair: it takes notice of the fact that one or more replicas responded to a query with an outdated value, and updates those replicas with the most current value so that they are all consistent.
There are several consistency levels that you can specify, and they mean something different for read operations than for write operations. The possible consistency levels, and the implications of specifying each one for read queries, are shown in Table 7-1.
The consistency levels are based on the replication factor specified in the configuration file, not on the total number of nodes in the system.
Consistency level | Implication |
ZERO | Unsupported. You cannot specify CL.ZERO for
read operations because it doesn’t make sense. This would amount
to saying “give me the data from no nodes.” |
ANY | Unsupported. Use CL.ONE instead. |
ONE | Immediately return the record held by the first node that responds to the query. A background thread is created to check that record against the same record on other replicas. If any are out of date, a read repair is then performed to sync them all to the most recent value. |
QUORUM | Query all nodes. Once a majority of replicas
((replication factor / 2) + 1) respond, return to the
client the value with the most recent timestamp. Then, if
necessary, perform a read repair in the background on all
remaining replicas. |
ALL | Query all nodes. Wait for all nodes to respond, and return to the client the record with the most recent timestamp. Then, if necessary, perform a read repair in the background. If any nodes fail to respond, fail the read operation. |
As you can see from the table, there are certain consistency levels
that are not supported for read operations: ZERO
and
ANY
. Notice that the implication of consistency level
ONE
is that the first node to respond to the read operation
is the value that the client will get—even if it is out of
date. The read repair operation is performed
after the record is returned, so any subsequent reads
will all have a consistent value, regardless of the responding
node.
Another item worth noting is in the case of consistency level
ALL
. If you specify CL.ALL
, then you’re saying
that you require all replicas to respond, so if any node with that record
is down or otherwise fails to respond before the timeout, the read
operation fails.
A node is considered unresponsive if it does not respond to a
query before the value specified by rpc_timeout_in_ms
in
the configuration file. The default is 10 seconds.
You can specify these consistency levels for write operations as well, though their meanings are very different. The implications of using the different consistency levels on writes are shown in Table 7-2.
Consistency level | Implication |
ZERO | The write operation will return immediately to the client before the write is recorded; the write will happen asynchronously in a background thread, and there are no guarantees of success. |
ANY | Ensure that the value is written to a minimum of one node, allowing hints to count as a write. |
ONE | Ensure that the value is written to the commit log and memtable of at least one node before returning to the client. |
QUORUM | Ensure that the write was received by at least a majority
of replicas ((replication factor / 2) + 1). |
ALL | Ensure that the number of nodes specified by
replication factor received the write before returning to the client. If even one
replica is unresponsive to the write operation, fail the
operation. |
The most notable consistency level for writes is the
ANY
level. This level means that the write is guaranteed to
reach at least one node, but it allows a hint to count as a
successful write. That is, if you perform a write operation and
the node that the operation targets for that value is down, the server
will make a note to itself, called a hint, which it
will store until that node comes back up. Once the node is up, the server
will detect this, look to see whether it has any writes that it saved for
later in the form of a hint, and then write the value to the revived node.
In many cases, the node that makes the hint actually
isn’t the node that stores it; instead, it sends it
off to one of the nonreplica neighbors of the node that is down.
Using the consistency level of ONE
on writes means that
the write operation will be written to both the commit log and the
memtable. That means that writes at CL.ONE
are durable, so
this level is the minimum level to use to achieve fast performance and
durability. If this node goes down immediately after the write operation,
the value will have been written to the commit log, which can be replayed
when the server is brought back up to ensure that it still has the
value.
For both reads and writes, the consistency levels of
ZERO
, ANY
, and ONE
are considered
weak, whereas QUORUM
and ALL
are considered
strong. Consistency is tuneable in Cassandra because clients can specify
the desired consistency level on both reads and writes. There is an
equation that is popularly used to represent the way to achieve strong consistency in Cassandra: R
+ W > N = strong consistency. In this equation,
R, W, and N
are the read replica count, the write replica count, and the replication
factor, respectively; all client reads will see the most recent write in
this scenario, and you will have strong consistency.
There are a few basic properties of reading data from Cassandra that are worth noting. First, it’s easy to read data because clients can connect to any node in the cluster to perform reads, without having to know whether a particular node acts as a replica for that data. If a client connects to a node that doesn’t have the data it’s trying to read, the node it’s connected to will act as coordinator node to read the data from a node that does have it, identified by token ranges.
To fulfill read operations, Cassandra does have to perform seeks, but you can speed these up by adding RAM. Adding RAM will help you if you find the OS doing a lot of paging on reads (in general, it is better to enable the various caches Cassandra has). Cassandra has to wait for a number of responses synchronously (based on consistency level and replication factor), and then perform read repairs as necessary.
So reads are clearly slower than writes, for these various reasons. The partitioner doesn’t influence the speed of reads. In the case of range queries, using OPP is significantly faster because it allows you to easily determine which nodes don’t have your data. The partitioner’s responsibility is to perform the consistent hash operation that maps keys to nodes. In addition, you can choose row caching and key caching strategies to give you a performance boost (see Chapter 11).
This section presents an overview of the basic Cassandra API so that once we start reading and writing data, some of these exotic terms won’t seem quite so difficult. We already know what a column, super column, and column family are: a column family contains columns or super columns; a super column contains only columns (you can’t nest one super column inside another); and columns contain a name/value pair and a timestamp.
In a relational database, the terms SELECT, INSERT, UPDATE, and DELETE mean just what they mean colloquially, in regular life. But working with Cassandra’s API structures is not exactly straightforward, and can be somewhat daunting to newcomers; after all, there’s no such thing as a “slice range” in regular life, so these terms may take some getting used to.
There are two basic concepts that you’ll want to learn quickly: ranges and slices. Many queries are defined using these terms, and they can be a bit confusing at first.
Columns are sorted by their type (as specified by
CompareWith
), and rows are sorted by their
partitioner.
A range basically refers to a mathematical range, where you have a set of ordered elements and you want to specify some subset of those elements by defining a start element and a finish element. The range is the representation of all the elements between start and finish, inclusive.
Ranges typically refer to ranges of keys (rows). The term slice is used to refer to a range of columns within a row.
The range works according to the column family’s comparator. That
is, given columns a
, b
, c
,
d
, and e
, the range of (a,c)
includes columns a
, b
, and c
. So,
if you have 1,000 columns with names that are long integers, you can see
how you could easily specify a range of columns between 35 and 45. By
using ranges, you can retrieve all of the columns in a range that you
define (called a range slice), or you can perform
the same update to the items in the range using a batch.
You may have many hundreds of columns defined on a row, but you might not want to retrieve all of them in a given query. Columns are stored in sorted order, so the range query is provided so that you can fetch columns within a range of column names.
Range queries require using an
OrderPreservingPartitioner
, so that keys are
returned in the order defined by the collation used by the partitioner.
When specifying a range query and using Random Partitioner, there’s really no way to specify a range more narrow than “all”. This is obviously an expensive proposition, because you might incur additional network operations. It can also potentially result in missed keys. That’s because it’s possible that an update happening at the same time as your row scan will miss the updates made earlier in the index than what you are currently processing.
There is another thing that can be confusing at first. When you are using Random Partitioner, you must recall that range queries first hash the keys. So if you are using a range of “Alice” to “Alison”, the query will first run a hash on each of those keys and return not simply the natural values between Alice and Alison, but rather the values between the hashes of those values.
Here is the basic flow of a read operation that looks for a specific key when using Random Partitioner. First, the key is hashed, and then the client connects to any node in the cluster. That node will route the request to the node with that key. The memtable is consulted to see whether your key is present; if it’s not found, then a scan is performed on the Bloom filter for each file, starting with the newest one. Once the key is found in the Bloom filter, it is used to consult the corresponding datafile and find the column values.
We’ll look at inserts first, because you need to have something in the database to query. In this section we set up a Java project and walk through a complete example that does an insert and then reads the data back.
First, download Cassandra from http://cassandra.apache.org. It’s easiest to get started with the binary version. See Chapter 2 if you’re having any trouble.
Now let’s create a new project to test some of our work. For now, we’ll use a Java project in Eclipse. First, create a new project, and then add a few JARs to your classpath: a Log4J JAR to output log statements (don’t forget to set the properties file for your own classes); the Thrift library called libthrift-r917130.jar, which contains the org.apache.thrift classes; and the Cassandra JAR apache-cassandra-x.x.x.jar, which contains the org.apache.cassandra classes. We also need to add the SLF4J logger (both the API and the implementation JAR), which Cassandra requires. Finally, add an argument to your JVM to add the log4j.properties file to your classpath:
-Dlog4j.configuration=file:///home/eben/books/cassandra/log4j.properties
In Eclipse, you can add the log4j.properties file by creating a new Run Configuration. Click the Arguments tab, and then in the VM Arguments text field, enter the parameter specified in the previous code sample—of course using the actual path to your properties file.
My log4j.properties file looks like this:
# output messages into a rolling log file as well as stdout log4j.rootLogger=DEBUG,stdout,R # stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n # rolling log file log4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.file.maxFileSize=5MB log4j.appender.file.maxBackupIndex=5 log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %C %F (line %L) %m%n # This points to your logs directory log4j.appender.R.File=cass-client.log
To keep this simple, we’ll use the default keyspace and
configuration. Now start the server, and create a class that looks like
the one shown in Example 7-1. This example will open
a connection to Cassandra and write a new row with two columns:
name
and age
. We then read back a single column
value for that row, and then read the entire row.
package com.cassandraguide.rw; import java.io.UnsupportedEncodingException; import java.util.List; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Clock; import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.ColumnPath; import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.NotFoundException; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.SliceRange; import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; public class SimpleWriteRead { private static final Logger LOG = Logger.getLogger(SimpleWriteRead.class); //set up some constants private static final String UTF8 = "UTF8"; private static final String HOST = "localhost"; private static final int PORT = 9160; private static final ConsistencyLevel CL = ConsistencyLevel.ONE; //not paying attention to exceptions here public static void main(String[] args) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException { TTransport tr = new TSocket(HOST, PORT); //new default in 0.7 is framed transport TFramedTransport tf = new TFramedTransport(tr); TProtocol proto = new TBinaryProtocol(tf); Cassandra.Client client = new Cassandra.Client(proto); tf.open(); client.set_keyspace("Keyspace1"); String cfName = "Standard1"; byte[] userIDKey = "1".getBytes(); //this is a row key Clock clock = new Clock(System.currentTimeMillis()); //create a representation of the Name column ColumnPath colPathName = new ColumnPath(cfName); colPathName.setColumn("name".getBytes(UTF8)); ColumnParent cp = new ColumnParent(cfName); //insert the name column LOG.debug("Inserting row for key " + new String(userIDKey)); client.insert(userIDKey, cp, new Column("name".getBytes(UTF8), "George Clinton".getBytes(), clock), CL); //insert the Age column client.insert(userIDKey, cp, new Column("age".getBytes(UTF8), "69".getBytes(), clock), CL); LOG.debug("Row insert done."); // read just the Name column LOG.debug("Reading Name Column:"); Column col = client.get(userIDKey, colPathName, CL).getColumn(); LOG.debug("Column name: " + new String(col.name, UTF8)); LOG.debug("Column value: " + new String(col.value, UTF8)); LOG.debug("Column timestamp: " + col.clock.timestamp); //create a slice predicate representing the columns to read //start and finish are the range of columns--here, all SlicePredicate predicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new byte[0]); sliceRange.setFinish(new byte[0]); predicate.setSlice_range(sliceRange); LOG.debug("Complete Row:"); // read all columns in the row ColumnParent parent = new ColumnParent(cfName); List<ColumnOrSuperColumn> results = client.get_slice(userIDKey, parent, predicate, CL); //loop over columns, outputting values for (ColumnOrSuperColumn result : results) { Column column = result.column; LOG.debug(new String(column.name, UTF8) + " : " + new String(column.value, UTF8)); } tf.close(); LOG.debug("All done."); } }
Running this example will output the following:
DEBUG 14:02:09,572 Inserting row for key 1 DEBUG 14:02:09,580 Row insert done. DEBUG 14:02:09,580 Reading Name Column: DEBUG 14:02:09,585 Column name: name DEBUG 14:02:09,586 Column value: George Clinton DEBUG 14:02:09,586 Column timestamp: 1284325329569 DEBUG 14:02:09,589 Complete Row: DEBUG 14:02:09,594 age : 69 DEBUG 14:02:09,594 name : George Clinton DEBUG 14:02:09,594 All done.
This isn’t Cassandra-specific, but in Java you can easily get a
more user-friendly representation of a date by wrapping the
long
timestamp output with a new
Date
object, like this: new
Date(col.timestamp);
.
Let’s unpack what we’ve done here. First, we create a connection to the Cassandra server:
TTransport tr = new TSocket(HOST, PORT); //new default in 0.7 is framed transport TFramedTransport tf = new TFramedTransport(tr); TProtocol proto = new TBinaryProtocol(tf); Cassandra.Client client = new Cassandra.Client(proto); tf.open(); client.set_keyspace("Keyspace1");
Here we’re using the framed transport, which is the new default in Cassandra 0.7. This code connects to Cassandra at the specified keyspace.
Then, we create representations for the column family we’ll use, and convenience values for the row key and clock that indicate when this insert was performed:
String cfName = "Standard1"; byte[] userIDKey = "1".getBytes(); //this is a row key Clock clock = new Clock(System.currentTimeMillis());
Next, we use the client object with the column path to insert a new value:
ColumnParent cp = new ColumnParent(cfName); //insert the name column LOG.debug("Inserting row for key " + new String(userIDKey)); client.insert(userIDKey, cp, new Column("name".getBytes(UTF8), "George Clinton".getBytes(), clock), CL);
The insert operation requires a row key, as well as the column object that includes the column name and the value we want to assign to it for this row key. We also specify the clock representing when this insert was performed, and the consistency level to apply.
We then basically repeat this operation to write to the same row, but now to the age column, giving it a value of 69:
client.insert(userIDKey, cp, new Column("age".getBytes(UTF8), "69".getBytes(), clock), CL);
So at this point we have inserted two columns into a single row and are ready to read it back to verify.
To ensure our insert went well by reading it back, we use the client
get
method, passing the row key and the path to
the column we want to read (the name
column below), and then
specify the consistency level we require for this operation:
ColumnPath colPathName = new ColumnPath(cfName); colPathName.setColumn("name".getBytes(UTF8)); Column col = client.get(userIDKey, colPathName, CL).getColumn(); LOG.debug("Column name: " + new String(col.name, UTF8)); LOG.debug("Column value: " + new String(col.value, UTF8)); LOG.debug("Column timestamp: " + col.clock.timestamp);
So each column value has its own timestamp (wrapped in a clock) because it’s the column, not the row, that is the atomic unit. This can be confusing when you first come to Cassandra if you’re used to adding a timestamp column to a relational table to indicate when it was last updated. But there’s no such thing in Cassandra as the last time a row was updated; it’s granular at the column level.
Because Cassandra returns a byte array for column names and values,
we create a String
object around the byte array so we can do
application stuff with it (like write it to a log here). The clock is
stored as a long (representing the milliseconds since the Unix epoch), so
we could wrap this in a new java.util.Date
object
if we wanted to.
So using the get
method and specifying the column path
and other parameters, we read a single column’s value. But now we want to
get a “range” of columns for a single row (called a
slice). So we use a slice predicate:
SlicePredicate predicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new byte[0]); sliceRange.setFinish(new byte[0]); predicate.setSlice_range(sliceRange);
The slice predicate is a container object that
allows us to specify the range of columns that we want to read between a
start and a finish. By specifying new byte[0]
as the start
and finish positions of the range, we’re saying that we want all of the
columns.
Now that our predicate is set up, we can execute the range slice query that will get all of the columns for our row, so we can loop over them one by one:
ColumnParent parent = new ColumnParent(cfName); List<ColumnOrSuperColumn> results = client.get_slice(userIDKey, parent, predicate, CL);
This get_slice
query uses the predicate we
created as well as two new things: the
ColumnOrSuperColumn
class and a column parent. The
ColumnOrSuperColumn
class is just what it says: it
represents either a column or a super column returned by Thrift. Thrift
does not have support for inheritance, so this class is used to pack up
both columns and super columns in this one object (depending on what
you’re querying). The client just reads the values if a column is
returned; if a super column is returned, the client gets a column out of
the super column and reads that.
The column parent is the path to the parent of a set of columns.
Because we’re retrieving a set of columns in a
get_slice
by definition, we need to specify the
column family that is the parent of the columns we’re looking up. Now we
can loop over columns for this row, printing out each column’s three
attributes, and then close the connection:
for (ColumnOrSuperColumn result : results) { Column column = result.column; LOG.debug(new String(column.name, UTF8) + " : " + new String(column.value, UTF8)); } tf.close();
You use the insert
operation to add values or to
overwrite existing values. So to update a value, use the
insert
operation with the new column values you want for the
same key.
You can also insert many values at once, which we see how to do later in this chapter in Batch Mutates.
Use the get
operation to retrieve columns or super
columns, using a column path to access them:
ColumnOrSuperColumn get(byte[] key, ColumnPath column_path, ConsistencyLevel consistency_level)
Example 7-2 shows how to do this.
package com.cassandraguide.rw; //imports left out public class GetExample { private static final Logger LOG = Logger.getLogger(GetExample.class); private static final String UTF8 = "UTF8"; private static final String HOST = "localhost"; private static final int PORT = 9160; private static final ConsistencyLevel CL = ConsistencyLevel.ONE; public static void main(String[] args) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException { TTransport tr = new TSocket(HOST, PORT); //new default in 0.7 is framed transport TFramedTransport tf = new TFramedTransport(tr); TProtocol proto = new TBinaryProtocol(tf); Cassandra.Client client = new Cassandra.Client(proto); tf.open(); client.set_keyspace("Keyspace1"); String cfName = "Standard1"; byte[] userIDKey = "1".getBytes(); //this is the row key Clock clock = new Clock(System.currentTimeMillis()); //create a representation of the Name column ColumnParent cp = new ColumnParent(cfName); //insert the name column LOG.debug("Inserting row for key " + new String(userIDKey)); client.insert(userIDKey, cp, new Column("name".getBytes(UTF8), "George Clinton".getBytes(), clock), CL); LOG.debug("Row insert done."); /** Do the GET */ LOG.debug("Get result:"); // read all columns in the row ColumnPath path = new ColumnPath(); path.column_family = cfName; path.column = "name".getBytes(); ColumnOrSuperColumn cosc = client.get(userIDKey, path, CL); Column column = cosc.column; LOG.debug(new String(column.name, UTF8) + " : " + new String(column.value, UTF8)); //END GET tr.close(); LOG.debug("All done."); } }
Here, we perform an insert so that we have something to get. We
create a client object and then call its get
method, which
takes the row key, a column path, and a consistency level as arguments.
The column path sets the name of the column that we’re looking for.
Remember that the column names and values are binary (in a Java client
they’re byte arrays), so we have to convert the string column name to a
byte array for the query. Then when we get the column’s value, it’s a byte
array too, so we convert it to a string to work with the result.
In this example, we add values for both the name and the age columns, but because our column path represents only the single column we’re interested in querying, we just get the age. The output is shown here:
DEBUG 14:36:42,265 Inserting row for key 1 DEBUG 14:36:42,273 Row insert done. DEBUG 14:36:42,273 Get result: DEBUG 14:36:42,282 name : George Clinton DEBUG 14:36:42,282 All done.
Here we’ll just use the command-line interface to quickly create a couple of keys with some different columns to serve as data for the following queries:
[default@Keyspace1]set Standard1['k1']['a']='1'
Value inserted. [default@Keyspace1]set Standard1['k1']['b']='2'
Value inserted. [default@Keyspace1]set Standard1['k1']['c']='3'
Value inserted. [default@Keyspace1]set Standard1['k2']['a']='2.1'
Value inserted. [default@Keyspace1]set Standard1['k2']['b']='2.2'
So we have two rows; the first has three columns and the second has two columns.
A slice predicate is used in both read and write operations, acting as a limiting factor for specifying a set of columns. You can specify the predicate one of two ways: with a list of column names or with a slice range. If you know the names of a few columns that you want to retrieve, then you can specify them explicitly by name. If you don’t know their names, or for another reason want to retrieve a range of columns, you use a slice range object to specify the range.
I’m using contained examples to be concise. But it is not uncommon to see many, many columns defined per row in Cassandra, so don’t let these examples mislead you. It’s a big data store and can hold two billion columns per row in version 0.7.
To use the slice predicate, you create a predicate object populated with the column names you want, and pass it to your read operation.
If you want to get just the columns called “a” and “b” in a single row, you can use the predicate with the column names specified.
You can get a set of columns contained in a column parent using a
get_slice
operation. It will retrieve values by column name
or a range of names, and will return either columns or super columns.
The get_slice
operation has this signature:
List<ColumnOrSuperColumn> get_slice(byte[] key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel cl)
This is shown in Example 7-3.
package com.cassandraguide.rw; // imports omitted public class SlicePredicateExample { public static void main(String[] args) throws Exception { Connector conn = new Connector(); Cassandra.Client client = conn.connect(); SlicePredicate predicate = new SlicePredicate(); List<byte[]> colNames = new ArrayList<byte[]>(); colNames.add("a".getBytes()); colNames.add("b".getBytes()); predicate.column_names = colNames; ColumnParent parent = new ColumnParent("Standard1"); byte[] key = "k1".getBytes(); List<ColumnOrSuperColumn> results = client.get_slice(key, parent, predicate, ConsistencyLevel.ONE); for (ColumnOrSuperColumn cosc : results) { Column c = cosc.column; System.out.println(new String(c.name, "UTF-8") + " : " + new String(c.value, "UTF-8")); } conn.close(); System.out.println("All done."); } }
In this example, only the specified columns will be retrieved, and
the other columns will be ignored. The query returns a list of
ColumnOrSuperColumn
objects. Because we know that we’re
querying a regular column family, we get the column out of the ColumnOrSuperColumn
data structure returned
by the underlying RPC mechanism (Thrift), and finally retrieve the names
and values from it in a loop.
The output is as follows:
a : 1 b : 2 All done.
Sometimes you don’t want to specify each and every column you want to retrieve, perhaps because you have a lot of columns to retrieve or because you don’t know all of their names.
To read a range of the columns in a row, you can specify the start and finish columns, and Cassandra will give you the start and finish columns as well as any columns in between, according to the sorting order based on the comparator for that column family. Create your slice predicate, then create your range, and then set the range into the predicate before passing the predicate to your read operation. Here’s an example:
SlicePredicate predicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setStart("age".getBytes()); sliceRange.setFinish("name".getBytes()); predicate.setSlice_range(sliceRange);
When executed with a get_slice
operation,
this query will return the two columns specified, as well as any columns
that a comparison operation would sort within that range (between them
lexicographically, numerically, or whatever). For example, if this row
also had an “email” column, it would be returned in the results as
well.
You must consider the column names according to their comparator,
and specify start and finish in the proper order. For example, trying to
set a start of name
and a finish of age
will
throw an exception like this:
InvalidRequestException(why:range finish must come after start in the order of traversal)
Recall that “returning a column” doesn’t mean that you get the value, as in SQL; it means that you get the complete column data structure, which is the name, the value, and the timestamp.
You can limit the number of columns returned by your slice range
by using the count attribute of the Slice Range
structure. Let’s say we have a row with hundreds of columns. We could specify a range that
might include many of these columns, but limit our result set to only
the first 10 columns like this:
SliceRange sliceRange = new SliceRange(); sliceRange.setStart("a".getBytes()); sliceRange.setFinish("d".getBytes()); sliceRange.count = 10;
Again, the “first” columns are those according to the order dictated by the column family’s comparator.
To read all of the columns in a row, you still need the predicate using a slice range, but you pass it empty byte arrays for the start and finish parameters, like this:
SlicePredicate predicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new byte[0]); sliceRange.setFinish(new byte[0]); predicate.setSlice_range(sliceRange);
Then, you can pass this populated predicate object to your
get_slice
operation along with the other
necessary parameters (such as consistency level and so on).
In the same way that you access a set of columns by using a range,
you can also access a range of keys or tokens. Using the
get_range_slices
operation, you can pass it a
KeyRange
object that defines the range of keys you
want.
One major distinction here is that you can get either keys or tokens
in a range using the same KeyRange
data structure as the
parameter to get_range_slices
. Key ranges are
start-inclusive, whereas token ranges are start-exclusive; because of ring
distribution, tokens can wrap around so that the end token is less than
the start token.
The API defines the operation get_range_slices
for you
to use. The operation is structured like this:
List<KeySlice> results = client.get_range_slices(parent, predicate, keyRange, ConsistencyLevel);
For a complete listing that shows using a range slice, see Example 7-4.
package com.cassandraguide.rw; //imports omitted public class GetRangeSliceExample { public static void main(String[] args) throws Exception { Connector conn = new Connector(); Cassandra.Client client = conn.connect(); System.out.println("Getting Range Slices."); SlicePredicate predicate = new SlicePredicate(); List<byte[]> colNames = new ArrayList<byte[]>(); colNames.add("a".getBytes()); colNames.add("b".getBytes()); predicate.column_names = colNames; ColumnParent parent = new ColumnParent("Standard1"); KeyRange keyRange = new KeyRange(); keyRange.start_key = "k1".getBytes(); keyRange.end_key = "k2".getBytes(); //a key slice is returned List<KeySlice> results = client.get_range_slices(parent, predicate, keyRange, ConsistencyLevel.ONE); for (KeySlice keySlice : results) { List<ColumnOrSuperColumn> cosc = keySlice.getColumns(); System.out.println("Current row: " + new String(keySlice.getKey())); for (int i = 0; i < cosc.size(); i++) { Column c = cosc.get(i).getColumn(); System.out.println(new String(c.name, "UTF-8") + " : " + new String(c.value, "UTF-8")); } } conn.close(); System.out.println("All done."); } }
This program assumes that you’ve added a few values for a few different row keys to work with, as shown in Seeding Some Values.
The program outputs the following:
Getting Range Slices. Current row: k1 a : 1 b : 2 Current row: k2 a : 2.1 b : 2.2 All done.
Though the names involving “slice” and “range” may seem unusual at first, they turn out to be innocuous. Just remember that slices mean sets of columns and ranges mean sets of keys. So in this example, we’re getting multiple columns for multiple row keys.
With get_slice
, we saw how to get a set of column names
for a single specified row key. multiget_slice
lets you
retrieve a subset of columns for a set of row keys based on a column
parent and a predicate. That is, given more than one
row key, retrieve the value of the named columns for each key. So a
multiget slice is more than one named column for more than one
row.
There used to be a method called multiget
, but it is
now deprecated in favor of multiget_slice
.
The operation looks like this:
Map<byte[],List<ColumnOrSuperColumn>> results = client.multiget_slice(rowKeys, parent, predicate, CL);
You specify the parent and the predicate as usual, and also provide the operation with the set of row keys you want to query. The row keys are just specified as a list of byte arrays, which are the key names.
The results come back to us as a
Map<byte[],List<ColumnOrSuperColumn>>
. This may
seem like a complicated data structure, but it’s actually simple. A map is
a key/value pair, and in this case the byte array key is the row key, so
in this example there will be two keys: one for each row key we get. Each
byte[]
key in the results map points to a list containing one
or more ColumnOrSuperColumn
objects. This structure
is used because Thrift does not support inheritance. You have to know
whether your column family is of type Standard
or
Super
, and then you just get the right one from the data
structure. From the
ColumnOrSuperColumn
, you extract the column (in
this case, the super_
column
will be empty), and then use the
column object to get the name and value. You could also get the
timestamp from it if you wanted to.
An example of using multiget slice is shown in Example 7-5.
package com.cassandraguide.rw; //imports omitted public class MultigetSliceExample { private static final ConsistencyLevel CL = ConsistencyLevel.ONE; private static final String columnFamily = "Standard1"; public static void main(String[] args) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException { Connector conn = new Connector(); Cassandra.Client client = conn.connect(); System.out.println("Running Multiget Slice."); SlicePredicate predicate = new SlicePredicate(); List<byte[]> colNames = new ArrayList<byte[]>(); colNames.add("a".getBytes()); colNames.add("c".getBytes()); predicate.column_names = colNames; ColumnParent parent = new ColumnParent(columnFamily); //instead of one row key, we specify many List<byte[]> rowKeys = new ArrayList<byte[]>(); rowKeys.add("k1".getBytes()); rowKeys.add("k2".getBytes()); //instead of a simple list, we get a map, where the keys are row keys //and the values the list of columns returned for each Map<byte[],List<ColumnOrSuperColumn>> results = client.multiget_slice(rowKeys, parent, predicate, CL); for (byte[] key : results.keySet()) { List<ColumnOrSuperColumn> row = results.get(key); System.out.println("Row " + new String(key) + " --> "); for (ColumnOrSuperColumn cosc : row) { Column c = cosc.column; System.out.println(new String(c.name, "UTF-8") + " : " + new String(c.value, "UTF-8")); } } conn.close(); System.out.println("All done."); } }
So we have a couple of row keys in the database, with identifiers
I’ve kept purposefully short and clear in order to help you visualize the
structure. We have a variety of column sets between the two rows but we’re
only interested in retrieving the a
and c
columns, so we use a slice predicate and specify the
column_names
to limit the results. We also want to specify
more than one row, so we use a list of byte arrays to indicate which row
keys we’re after.
Running this code elicits the following result:
Running Multiget Slice. Row k2 --> a : 2.1 Row k1 --> a : 1 c : 3 All done.
As you can see, there was no column named “b” defined for the row with key “k2”, and Cassandra didn’t return anything for it. There was a value for column “b” in row “k1”, but we didn’t ask for it in our slice, so we didn’t get it.
Deleting data is not the same in Cassandra as it is in a relational database. In RDBMS, you simply issue a delete statement that identifies the row or rows you want to delete. In Cassandra, a delete does not actually remove the data immediately. There’s a simple reason for this: Cassandra’s durable, eventually consistent, distributed design. If Cassandra had a straightforward design for deletes and a node goes down, that node would therefore not receive the delete. Once that node comes back online, it would mistakenly think that all of the nodes that had received the delete had actually missed a write (the data that it still has because it missed the delete), and it would start repairing all of the other nodes. So Cassandra needs a more sophisticated mechanism to support deletes. That mechanism is called a tombstone.
A tombstone is a special marker issued in a delete that overwrites
the deleted values, acting as a placeholder. If any replica did not
receive the delete operation, the tombstone can later be propagated to
those replicas when they are available again. The net effect of this
design is that your data store will not immediately shrink in size
following a delete. Each node keeps track of the age of all its
tombstones. Once they reach the age as configured in
gc_grace_seconds
(which is 10 days by default), then a
compaction is run, the tombstones are garbage-collected, and the
corresponding disk space is recovered.
Remember that SSTables are immutable, so the data is not deleted from the SSTable. On compaction, tombstones are accounted for, merged data is sorted, a new index is created over the sorted data, and the freshly merged, sorted, and indexed data is written to a single new file.
The assumption is that 10 days is plenty of time for you to bring a failed node back online before compaction runs. If you feel comfortable doing so, you can reduce that grace period to reclaim disk space more quickly.
Let’s run an example that will delete some data that we previously
inserted. Note that there is no “delete” operation in Cassandra, it’s
remove
, and there’s really no “remove,” it’s just a write (of
a tombstone flag). Because a remove
operation is
really a tombstone write, you still have to supply a timestamp with the
operation, because if there are multiple clients writing, the highest
timestamp wins—and those writes might include a tombstone or a new value.
Cassandra doesn’t discriminate here; whichever operation has the highest
timestamp will win.
A simple delete looks like this:
Connector conn = new Connector(); Cassandra.Client client = conn.connect(); String columnFamily = "Standard1"; byte[] key = "k2".getBytes(); //this is the row key Clock clock = new Clock(System.currentTimeMillis()); ColumnPath colPath = new ColumnPath(); colPath.column_family = columnFamily; colPath.column = "b".getBytes(); client.remove(key, colPath, clock, ConsistencyLevel.ALL); System.out.println("Remove done."); conn.close();
There were many examples of using batch mutate to perform multiple inserts in Chapter 4, so I won’t rehash that here. I’ll just present an overview.
To perform many insert or update operations at once, use the
batch_mutate
method instead of the insert
method. Like a batch update in the relational world, the
batch_mutate
operation allows grouping calls on many keys
into a single call in order to save on the cost of network round trips. If
batch_mutate
fails in the middle of its list of mutations,
there will be no rollback, so any updates that have already occured up to
this point will remain intact. In the case of such a failure, the client
can retry the batch_mutate
operation.
There used to be an operation called
, but it is
deprecated.batch_insert
The sample application doesn’t include any delete operations, so let’s look at that in a little more depth.
You use remove
to delete a single column, but you can
use a Deletion
structure with a
batch_mutate
operation to perform a set of
complex delete operations at once.
You can create a list of column names that you want to delete, and
then indirectly pass it to batch_mutate
. I say
“indirectly” because there are several data structures that you need to
create in order to run a deletion.
First, create the list of column names to delete. Pass that list
to a SlicePredicate
, pass the
SlicePredicate
to a
Deletion
object, pass that to a
Mutation
object, and finally, pass that to a
batch_mutate
.
The following code snippet shows how to do this. First, you create
a SlicePredicate
object to hold the names of the
columns you want to delete. Here we just want to delete the “b” column.
Then, you create a Deletion
object that sets this
predicate, and create a Mutation
that sets this
Deletion
.
Once you have the Deletion
object set up,
you can create your mutation map. This map uses a byte array key to
point to the deletions you want to make, so you can use different keys
with the same or different mutation objects to perform the batch delete.
These keys point to another map, which itself uses a string as the key
to name the column family to modify. That map key points to a list of
mutations that should be performed.
String columnFamily = "Standard1"; byte[] key = "k2".getBytes(); //this is the row key Clock clock = new Clock(System.currentTimeMillis()); SlicePredicate delPred = new SlicePredicate(); List<byte[]> delCols = new ArrayList<byte[]>(); //let's delete the column named 'b', though we could add more delCols.add("b".getBytes()); delPred.column_names = delCols; Deletion deletion = new Deletion(); deletion.predicate = delPred; deletion.clock = clock; Mutation mutation = new Mutation(); mutation.deletion = deletion; Map<byte[], Map<String, List<Mutation>>> mutationMap = new HashMap<byte[], Map<String, List<Mutation>>>(); List<Mutation> mutationList = new ArrayList<Mutation>(); mutationList.add(mutation); Map<String, List<Mutation>> m = new HashMap<String, List<Mutation>>(); m.put(columnFamily, mutationList); //just for this row key, though we could add more mutationMap.put(key, m); client.batch_mutate(mutationMap, ConsistencyLevel.ALL);
There is a second way to specify items to delete using the
Deletion
structure: you can use a
SliceRange
instead of a
List
of columns, so you can delete by range
instead of explicitly listing column names.
You may sometimes hear people refer to “range ghosts” in Cassandra. This means that even if you have deleted all of the columns for a given row, you will still see a result returned for that row in a range slice, but the column data will be empty. This is valid, and is just something to keep in mind as you iterate result sets on the client.
You can create keyspaces and column families through the API as well. Example 7-6 shows you how.
package com.cassandraguide.rw; //imports omitted /** * Shows how to define a keyspace and CF programmatically. */ public class DefineKeyspaceExample { public static void main(String[] args) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, InterruptedException { Connector conn = new Connector(); Cassandra.Client client = conn.connect(); System.out.println("Defining new keyspace."); KsDef ksdef = new KsDef(); ksdef.name = "ProgKS"; ksdef.replication_factor = 1; ksdef.strategy_class = "org.apache.cassandra.locator.RackUnawareStrategy"; List<CfDef> cfdefs = new ArrayList<CfDef>(); CfDef cfdef1 = new CfDef(); cfdef1.name = "ProgCF1"; cfdef1.keyspace = ksdef.name; cfdefs.add(cfdef1); ksdef.cf_defs = cfdefs; client.system_add_keyspace(ksdef); System.out.println("Defining new cf."); CfDef cfdef2 = new CfDef(); cfdef2.keyspace = ksdef.name; cfdef2.column_type = "Standard"; cfdef2.name = "ProgCF"; client.system_add_column_family(cfdef2); conn.close(); System.out.println("All done."); } }