As companies and organizations adopt technologies like Cassandra, they look for tools that can be used to perform analytics and queries against their data. The built-in ways to query can do much, along with custom layers atop that. However, there are distributed tools in the community that can be fitted to work with Cassandra as well.
Hadoop seems to be the elephant in the room when it comes to open source big data frameworks. There we find tools such as an open source MapReduce implementation and higher-level analytics engines built on top of that, such as Pig and Hive. Thanks to members of both the Cassandra and Hadoop communities, Cassandra has gained some significant integration points with Hadoop and its analytics tools.
In this chapter, we explore how Cassandra and Hadoop fit together. First, we give a brief history of the Apache Hadoop project and go into how one can write MapReduce programs against data in Cassandra. From there, we cover integration with higher-level tools built on top of Hadoop: Pig and Hive. Once we have an understanding of these tools, we cover how a Cassandra cluster can be configured to run these analytics in a distributed way. Finally, we share a couple of use cases where Cassandra is being used alongside Hadoop to solve real-world problems.
If you’re already familiar with Hadoop, you can safely skip this section. If you haven’t had the pleasure, Hadoop (http://hadoop.apache.org) is a set of open source projects that deal with large amounts of data in a distributed way. Its Hadoop distributed filesystem (HDFS) and MapReduce subprojects are open source implementations of Google’s GFS and MapReduce.
Google found that several internal groups had been implementing similar functionality in order to solve problems in a distributed way. They saw that it was common to have two phases of operations over distributed data: a map phase and a reduce phase. A map function operates over raw data and produces intermediate values. A reduce function distills those intermediate values in some way, producing the final output for that MapReduce computation. By standardizing on a common framework, they could build more solutions to problems rather than new models of the same wheel.
Doug Cutting decided to write open source implementations of the Google File System (http://labs.google.com/papers/gfs.html) and MapReduce (http://labs.google.com/papers/mapreduce.html), and thus, Hadoop was born. Since then, it has blossomed into myriad tools, all dealing with solutions for big data problems. Today, Hadoop is widely used, by Yahoo!, Facebook, LinkedIn, Twitter, IBM, Rackspace, and many other companies. There is a vibrant community and a growing ecosystem.
Cassandra has built-in support for the Hadoop implementation of MapReduce (http://hadoop.apache.org/mapreduce).
This section covers details on how to write a simple MapReduce job over data stored in Cassandra using the Java language. We also briefly cover how to output data into Cassandra and discuss ongoing progress with using Cassandra with Hadoop Streaming for languages beyond Java.
The word count example given in this section is also found in the Cassandra source download in its contrib module. It can be compiled and run using instructions found there. It is best to run with that code, as the current version might have minor modifications. However, the principles remain the same.
For convenience, the word count MapReduce example can be run locally against a single Cassandra node. However, for more information on how to configure Cassandra and Hadoop to run MapReduce in a more distributed fashion, see the section Cluster Configuration.
Cassandra has a Java source package for Hadoop integration code, called org.apache.cassandra.hadoop. There we find:
ColumnFamilyInputFormat
The main class we’ll use to interact with data stored in
Cassandra from Hadoop. It’s an extension of Hadoop’s
InputFormat
abstract class.
ConfigHelper
A helper class to configure Cassandra-specific information such as the server node to point to, the port, and information specific to your MapReduce job.
ColumnFamilySplit
The extension of Hadoop’s InputSplit
abstract class that creates splits over our Cassandra data. It
also provides Hadoop with the location of the data, so that it may
prefer running tasks on nodes where the data is stored.
ColumnFamilyRecordReader
The layer at which individual records from Cassandra are
read. It’s an extension of Hadoop’s
RecordReader
abstract class.
There are similar classes for outputting data to Cassandra in the Hadoop package, but at the time of this writing, those classes are still being finalized.
Word count is one of the examples given in the MapReduce paper and is the starting point for many who are new to the framework. It takes a body of text and counts the occurrences of each distinct word. Here we provide some code to perform a word count over data contained in Cassandra. A working example of word count is also included in the Cassandra source download.
First we need a Mapper
class, shown in Example 12-1.
public static class TokenizerMapper extends Mapper<byte[], SortedMap<byte[], IColumn>, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String columnName; public void map(byte[] key, SortedMap<byte[], IColumn> columns, Context context) throws IOException, InterruptedException { IColumn column = columns.get(columnName.getBytes()); String value = new String(column.value()); StringTokenizer itr = new StringTokenizer(value); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } protected void setup(Context context) throws IOException, InterruptedException { this.columnName = context.getConfiguration().get(“column_name”); } }
Readers familiar with MapReduce programs will notice how familiar this mapper looks. In this case, the inputs to the mapper are row keys and associated row values from Cassandra. Row values in the world of Cassandra are simply maps containing the column information. In addition to the word count code itself, we override the setup method to set the column name we are looking for. The rest of the mapper code is generic to any word count implementation.
When iterating over super columns in your mapper, each
IColumn
would need to be cast to a
SuperColumn
, and it would contain nested column
information.
Next, let’s look at a Reducer
implementation for our
word count, shown in Example 12-2.
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
There should be nothing surprising in this reducer; nothing is Cassandra-specific.
Finally, we get to the class that runs our MapReduce program, shown in Example 12-3.
public class WordCount extends Configured implements Tool {
public int run(String[] args) throws Exception {
Job job = new Job(getConf(), “wordcount”);
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(“/tmp/word_count”));
ConfigHelper.setThriftContact(job.getConfiguration(), “localhost”, 9160);
ConfigHelper.setInputColumnFamily(
job.getConfiguration(), “Keyspace1”, “Standard1”);
SlicePredicate predicate = new SlicePredicate().setColumn_names(
Arrays.asList(columnName.getBytes()));
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
job.waitForCompletion(true);
return 0;
}
}
There are a few things to note about the
WordCount
class that go beyond a boilerplate word
count. We need to set the InputFormat
to Cassandra’s
ColumnFamilyInputFormat
, along with the column name that our
mapper is looking for. Cassandra includes something called a
ConfigHelper
that provides a way to set properties
we’ll need, such as the Thrift contact information (server and port), the
keyspace, and the column family. It also allows us to set our slice
predicate.
Our example reducer used a FileOutputFormat
to output
its results. As its name suggests,
FileOutputFormat
writes the results out to a
filesystem. As of 0.7, there will be a Cassandra-based
OutputFormat
implementation. As of this writing,
some of the implementation details are still being finalized. For
updates on the built-in output format, see http://wiki.apache.org/cassandra/HadoopSupport.
It is possible, however, to write directly to Cassandra via Thrift
(or a higher-level client) in the Reducer
step. In the
previous example, this means that instead of writing to the context, one
could write its key and value to Cassandra directly.
Our word count MapReduce example was written in Java. Hadoop Streaming is the Hadoop way to allow languages besides Java to run MapReduce jobs using Standard In and Standard Out. As of this writing, Hadoop Streaming is not supported in Cassandra’s MapReduce integration. However, work is being done to support it in the near future. For details about the current status, see the wiki.
MapReduce is a great abstraction for developers so that they can worry less about the details of distributed computing and more about the problems they are trying to solve. Over time, an even more abstracted toolset has emerged. Pig and Hive operate at a level above MapReduce and allow developers to perform more complex analytics more easily. Both of these frameworks can operate against data in Cassandra.
Pig (http://hadoop.apache.org/pig) is a platform for data analytics developed at Yahoo!. Included in the platform is a high-level language called Pig Latin and a compiler that translates programs written in Pig Latin into sequences of MapReduce jobs.
Along with the direct Hadoop integration for running MapReduce jobs over data in Cassandra, there has also been work done to provide integration for Pig. With its grunt shell prompt and the Pig Latin scripting language, Pig provides a way to simplify writing analytics code. To write our word count example using Pig Latin:
LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage() as (key:chararray, cols:bag{col:tuple(name:bytearray, value:bytearray)}); cols = FOREACH rows GENERATE flatten(cols) as (name, value); words = FOREACH cols GENERATE flatten(TOKENIZE((chararray) value)) as word; grouped = GROUP words BY word; counts = FOREACH grouped GENERATE group, COUNT(words) as count; ordered = ORDER counts BY count DESC; topten = LIMIT ordered 10; dump topten;
This alternative word count is only eight lines long. Line 1 gets
all the data in the Standard1
column family, describing
that data with aliases and data types. We extract the name/value pairs
in each of the rows. In line 3, we have to cast the value to a character
array in order to tokenize it with the built-in TOKENIZE
function. We next group by and count each word instance. Finally, we
order our data by count and output the top 10 words found.
It is trivial to operate over super columns with Pig. It is simply another nested level of data that we can flatten in order to get its values.
To some, coding MapReduce programs is tedious and filled with boilerplate code. Pig provides an abstraction that makes our code significantly more concise. Pig also allows programmers to express operations such as joins much more simply than by using MapReduce alone.
The Pig integration code (a LoadFunc
implementation)
is found in the contrib section of Cassandra’s
source download. It can be compiled and run using instructions found
there, and it also includes instructions on how to configure
Cassandra-specific configuration
options. In a moment, we’ll see how to configure a Cassandra cluster to
run Pig jobs (compiled down to MapReduce) in a distributed way.
Like Pig, Hive (http://hadoop.apache.org/hive) is a platform for data analytics. Instead of a scripting language, queries are written in a query language similar to the familiar SQL called Hive-QL. Hive was developed by Facebook to allow large data sets to be abstracted into a common structure.
As of this writing, work on a Hive storage handler for Cassandra is being finalized. For updates and documentation on its usage with Cassandra, see the wiki.
MapReduce and other tools can run in a nondistributed way for trying things out or troubleshooting a problem. However, in order to run in a production environment, you’ll want to install Hadoop in your Cassandra cluster as well. Although a comprehensive discussion of Hadoop installation and configuration is outside the scope of this chapter, we do go over how to configure Cassandra alongside Hadoop for best performance. Readers can find more about Hadoop configuration at http://hadoop.apache.org or in Tom White’s excellent reference, Hadoop: The Definitive Guide (O’Reilly).
Because Hadoop has some unfamiliar terminology, here are some useful definitions:
The master node for HDFS. It has locations of data blocks stored in several datanodes and often runs on the same server as the jobtracker in smaller clusters.
Nodes for storing data blocks for HDFS. Datanodes run on the same servers as tasktrackers.
The master process for scheduling MapReduce jobs. The jobtracker accepts new jobs, breaks them into map and reduce tasks, and assigns those tasks to tasktrackers in the cluster. It is responsible for job completion. It often runs on the same server as the namenode in smaller clusters.
The process responsible for running map or reduce tasks from the jobtracker. Tasktrackers run on the same servers as datanodes.
Like Cassandra, Hadoop is a distributed system. The MapReduce jobtracker spreads tasks across the cluster, preferably near the data it needs. When a jobtracker initiates tasks, it looks to HDFS to provide it with information about where that data is stored. Similarly, Cassandra’s built-in Hadoop integration provides the jobtracker with data locality information so that tasks can be close to the data.
In order to achieve this data locality, Cassandra nodes must also be part of a Hadoop cluster. The namenode and jobtracker can reside on a server outside your Cassandra cluster. Cassandra nodes will need to be part of the cluster by running a tasktracker process on each node. Then, when a MapReduce job is initiated, the jobtracker can query Cassandra for locality of the data when it splits up the map and reduce tasks.
A four-node Cassandra cluster with tasktracker processes running on each Cassandra node is shown in Figure 12-1. At least one node in the cluster needs to be running the datanode process. There is a light dependency on HDFS for small amounts of data (the distributed cache), and a single datanode should suffice. External to the cluster is the server running the Hadoop namenode and jobtracker.
When a job is initiated from a client machine, it goes to the
jobtracker. The jobtracker receives information about the data source from
when the job is submitted, via the configuration options mentioned earlier. At that
point, it can use Cassandra’s
Column
FamilyRecordReader
and
ColumnFamilySplit
to get the physical locations of
different segments of data in the cluster. Then, it can use that
location to split up the tasks among the nodes in the cluster, preferring
to run tasks on nodes where the associated data resides.
Finally, when creating jobs for MapReduce to execute (either directly or via Pig), the Hadoop configuration needs to point to the namenode/jobtracker (in the Hadoop configuration files) and the Cassandra configuration options. The cluster will be able to handle the integration from there.
To help you understand why Cassandra/Hadoop integration is interesting and useful, here we include a couple of use cases from out in the Cassandra community.
Raptr is a service that allows gamers to communicate and share their gaming statistics and achievements. Keith Thornhill works as a Senior Software Engineer at Raptr and saw a need to take their storage and analytics backend to the next level. Their legacy storage solution and analytics were home grown, and they were outgrowing them. Doing queries across the entire dataset was tedious and could take hours to run.
Keith saw Cassandra as a promising storage solution for the following reasons:
Built-in scaling instead of scaffolded on
Single view of read/write access (no masters or slaves)
A hands-off style of operations that under normal cases (node failures, adding new nodes, etc.) “just works” and requires very little micromanagement
Keith also watched as the Cassandra/Hadoop integration evolved and saw Pig as an analytics solution he could use. Initially he wanted to look for ways to use PHP or Python to use MapReduce. However, after becoming familiar with Pig, he didn’t see a need. He noted that the turnaround time from idea to execution with Pig was very quick. The query runtime was also a nice surprise. He could traverse all of the data in 10–15, minutes rather than hours. As a result, Raptr is able to explore new possibilities in analyzing their data.
As far as configuration, Keith has a separate namenode/jobtracker and installed the datanode/tasktracker on each of his Cassandra nodes. He notes that a nice side effect of this is that the analytics engine scales with the data.
Imagini provides publishers with tools to profile all their site visitors through “visual quizzes” and an inference engine. Behind the scenes, this involves processing large amounts of behavioral data and then making the results available for real-time access.
After looking at several alternatives, Imagini went with Cassandra because of its fault tolerance, decentralized architecture (no single point of failure), and large write capacity.
Dave Gardner, a senior Imagini developer, writes, “We use Cassandra to store our real-time data, including information on roughly 100 million users, which is expected to grow substantially over the coming year. This is nearly all accessed via simple key lookup.”
Currently Imagini aggregates data from a variety of sources into Hadoop’s distributed filesystem, HDFS. Using Hadoop Streaming, they use PHP to MapReduce over their data and output directly to Cassandra via Thrift in their reducers. The results reside in Cassandra to provide real-time access to the data.
Looking forward, Imagini hopes to simplify their workflow once Hadoop Streaming becomes available with Cassandra. They’re planning on storing even raw data in Cassandra, MapReduce over that data, and then output the result into Cassandra.
In this chapter, we examined Hadoop, the open source implementation of the Google MapReduce algorithm. We took a brief look at Hadoop basics and how to integrate it with Cassandra by running the word count example that ships with Cassandra, and also saw how to use Pig, a simple and concise language to express MapReduce jobs.
Finally, we got a picture of how a few different companies are using Hadoop and Cassandra together.