Chapter 12. Integrating Hadoop

Jeremy Hanna

As companies and organizations adopt technologies like Cassandra, they look for tools that can be used to perform analytics and queries against their data. The built-in ways to query can do much, along with custom layers atop that. However, there are distributed tools in the community that can be fitted to work with Cassandra as well.

Hadoop seems to be the elephant in the room when it comes to open source big data frameworks. There we find tools such as an open source MapReduce implementation and higher-level analytics engines built on top of that, such as Pig and Hive. Thanks to members of both the Cassandra and Hadoop communities, Cassandra has gained some significant integration points with Hadoop and its analytics tools.

In this chapter, we explore how Cassandra and Hadoop fit together. First, we give a brief history of the Apache Hadoop project and go into how one can write MapReduce programs against data in Cassandra. From there, we cover integration with higher-level tools built on top of Hadoop: Pig and Hive. Once we have an understanding of these tools, we cover how a Cassandra cluster can be configured to run these analytics in a distributed way. Finally, we share a couple of use cases where Cassandra is being used alongside Hadoop to solve real-world problems.

What Is Hadoop?

If you’re already familiar with Hadoop, you can safely skip this section. If you haven’t had the pleasure, Hadoop (http://hadoop.apache.org) is a set of open source projects that deal with large amounts of data in a distributed way. Its Hadoop distributed filesystem (HDFS) and MapReduce subprojects are open source implementations of Google’s GFS and MapReduce.

Google found that several internal groups had been implementing similar functionality in order to solve problems in a distributed way. They saw that it was common to have two phases of operations over distributed data: a map phase and a reduce phase. A map function operates over raw data and produces intermediate values. A reduce function distills those intermediate values in some way, producing the final output for that MapReduce computation. By standardizing on a common framework, they could build more solutions to problems rather than new models of the same wheel.

Doug Cutting decided to write open source implementations of the Google File System (http://labs.google.com/papers/gfs.html) and MapReduce (http://labs.google.com/papers/mapreduce.html), and thus, Hadoop was born. Since then, it has blossomed into myriad tools, all dealing with solutions for big data problems. Today, Hadoop is widely used, by Yahoo!, Facebook, LinkedIn, Twitter, IBM, Rackspace, and many other companies. There is a vibrant community and a growing ecosystem.

Cassandra has built-in support for the Hadoop implementation of MapReduce (http://hadoop.apache.org/mapreduce).

Working with MapReduce

This section covers details on how to write a simple MapReduce job over data stored in Cassandra using the Java language. We also briefly cover how to output data into Cassandra and discuss ongoing progress with using Cassandra with Hadoop Streaming for languages beyond Java.

Note

The word count example given in this section is also found in the Cassandra source download in its contrib module. It can be compiled and run using instructions found there. It is best to run with that code, as the current version might have minor modifications. However, the principles remain the same.

For convenience, the word count MapReduce example can be run locally against a single Cassandra node. However, for more information on how to configure Cassandra and Hadoop to run MapReduce in a more distributed fashion, see the section Cluster Configuration.

Cassandra Hadoop Source Package

Cassandra has a Java source package for Hadoop integration code, called org.apache.cassandra.hadoop. There we find:

ColumnFamilyInputFormat

The main class we’ll use to interact with data stored in Cassandra from Hadoop. It’s an extension of Hadoop’s InputFormat abstract class.

ConfigHelper

A helper class to configure Cassandra-specific information such as the server node to point to, the port, and information specific to your MapReduce job.

ColumnFamilySplit

The extension of Hadoop’s InputSplit abstract class that creates splits over our Cassandra data. It also provides Hadoop with the location of the data, so that it may prefer running tasks on nodes where the data is stored.

ColumnFamilyRecordReader

The layer at which individual records from Cassandra are read. It’s an extension of Hadoop’s RecordReader abstract class.

There are similar classes for outputting data to Cassandra in the Hadoop package, but at the time of this writing, those classes are still being finalized.

Running the Word Count Example

Word count is one of the examples given in the MapReduce paper and is the starting point for many who are new to the framework. It takes a body of text and counts the occurrences of each distinct word. Here we provide some code to perform a word count over data contained in Cassandra. A working example of word count is also included in the Cassandra source download.

First we need a Mapper class, shown in Example 12-1.

Example 12-1. The TokenizerMapper.java class
public static class TokenizerMapper extends Mapper<byte[], 
    SortedMap<byte[], IColumn>, Text, IntWritable> {

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  private String columnName;

  public void map(byte[] key, SortedMap<byte[], IColumn> columns, Context context)
    throws IOException, InterruptedException {

    IColumn column = columns.get(columnName.getBytes());
    String value = new String(column.value());
    StringTokenizer itr = new StringTokenizer(value);

    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }

  protected void setup(Context context)
    throws IOException, InterruptedException {

    this.columnName = context.getConfiguration().get(“column_name”);
 }
}

Readers familiar with MapReduce programs will notice how familiar this mapper looks. In this case, the inputs to the mapper are row keys and associated row values from Cassandra. Row values in the world of Cassandra are simply maps containing the column information. In addition to the word count code itself, we override the setup method to set the column name we are looking for. The rest of the mapper code is generic to any word count implementation.

Note

When iterating over super columns in your mapper, each IColumn would need to be cast to a SuperColumn, and it would contain nested column information.

Next, let’s look at a Reducer implementation for our word count, shown in Example 12-2.

Example 12-2. The Reducer implementation
public static class IntSumReducer extends 
  Reducer<Text, IntWritable, Text, IntWritable> {

  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException {

    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }
}

There should be nothing surprising in this reducer; nothing is Cassandra-specific.

Finally, we get to the class that runs our MapReduce program, shown in Example 12-3.

Example 12-3. The WordCount class runs the MapReduce program
public class WordCount extends Configured implements Tool {

  public int run(String[] args) throws Exception {
    Job job = new Job(getConf(), “wordcount”);
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(ColumnFamilyInputFormat.class);

    FileOutputFormat.setOutputPath(job, new Path(“/tmp/word_count”));
    ConfigHelper.setThriftContact(job.getConfiguration(), “localhost”, 9160);
    ConfigHelper.setInputColumnFamily(
        job.getConfiguration(), “Keyspace1”, “Standard1”);
    
    SlicePredicate predicate = new SlicePredicate().setColumn_names(
        Arrays.asList(columnName.getBytes()));

    ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
    job.waitForCompletion(true);

    return 0;
  }
}

There are a few things to note about the WordCount class that go beyond a boilerplate word count. We need to set the InputFormat to Cassandra’s ColumnFamilyInputFormat, along with the column name that our mapper is looking for. Cassandra includes something called a ConfigHelper that provides a way to set properties we’ll need, such as the Thrift contact information (server and port), the keyspace, and the column family. It also allows us to set our slice predicate.

Outputting Data to Cassandra

Our example reducer used a FileOutputFormat to output its results. As its name suggests, FileOutputFormat writes the results out to a filesystem. As of 0.7, there will be a Cassandra-based OutputFormat implementation. As of this writing, some of the implementation details are still being finalized. For updates on the built-in output format, see http://wiki.apache.org/cassandra/HadoopSupport.

It is possible, however, to write directly to Cassandra via Thrift (or a higher-level client) in the Reducer step. In the previous example, this means that instead of writing to the context, one could write its key and value to Cassandra directly.

Hadoop Streaming

Our word count MapReduce example was written in Java. Hadoop Streaming is the Hadoop way to allow languages besides Java to run MapReduce jobs using Standard In and Standard Out. As of this writing, Hadoop Streaming is not supported in Cassandra’s MapReduce integration. However, work is being done to support it in the near future. For details about the current status, see the wiki.

Tools Above MapReduce

MapReduce is a great abstraction for developers so that they can worry less about the details of distributed computing and more about the problems they are trying to solve. Over time, an even more abstracted toolset has emerged. Pig and Hive operate at a level above MapReduce and allow developers to perform more complex analytics more easily. Both of these frameworks can operate against data in Cassandra.

Pig

Pig (http://hadoop.apache.org/pig) is a platform for data analytics developed at Yahoo!. Included in the platform is a high-level language called Pig Latin and a compiler that translates programs written in Pig Latin into sequences of MapReduce jobs.

Along with the direct Hadoop integration for running MapReduce jobs over data in Cassandra, there has also been work done to provide integration for Pig. With its grunt shell prompt and the Pig Latin scripting language, Pig provides a way to simplify writing analytics code. To write our word count example using Pig Latin:

LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage() 
as (key:chararray, cols:bag{col:tuple(name:bytearray, value:bytearray)});
cols = FOREACH rows GENERATE flatten(cols) as (name, value);
words = FOREACH cols GENERATE flatten(TOKENIZE((chararray) value)) as word;
grouped = GROUP words BY word;
counts = FOREACH grouped GENERATE group, COUNT(words) as count;
ordered = ORDER counts BY count DESC;
topten = LIMIT ordered 10;
dump topten;

This alternative word count is only eight lines long. Line 1 gets all the data in the Standard1 column family, describing that data with aliases and data types. We extract the name/value pairs in each of the rows. In line 3, we have to cast the value to a character array in order to tokenize it with the built-in TOKENIZE function. We next group by and count each word instance. Finally, we order our data by count and output the top 10 words found.

Note

It is trivial to operate over super columns with Pig. It is simply another nested level of data that we can flatten in order to get its values.

To some, coding MapReduce programs is tedious and filled with boilerplate code. Pig provides an abstraction that makes our code significantly more concise. Pig also allows programmers to express operations such as joins much more simply than by using MapReduce alone.

The Pig integration code (a LoadFunc implementation) is found in the contrib section of Cassandra’s source download. It can be compiled and run using instructions found there, and it also includes instructions on how to configure Cassandra-specific configuration options. In a moment, we’ll see how to configure a Cassandra cluster to run Pig jobs (compiled down to MapReduce) in a distributed way.

Hive

Like Pig, Hive (http://hadoop.apache.org/hive) is a platform for data analytics. Instead of a scripting language, queries are written in a query language similar to the familiar SQL called Hive-QL. Hive was developed by Facebook to allow large data sets to be abstracted into a common structure.

As of this writing, work on a Hive storage handler for Cassandra is being finalized. For updates and documentation on its usage with Cassandra, see the wiki.

Cluster Configuration

MapReduce and other tools can run in a nondistributed way for trying things out or troubleshooting a problem. However, in order to run in a production environment, you’ll want to install Hadoop in your Cassandra cluster as well. Although a comprehensive discussion of Hadoop installation and configuration is outside the scope of this chapter, we do go over how to configure Cassandra alongside Hadoop for best performance. Readers can find more about Hadoop configuration at http://hadoop.apache.org or in Tom White’s excellent reference, Hadoop: The Definitive Guide (O’Reilly).

Because Hadoop has some unfamiliar terminology, here are some useful definitions:

HDFS

Hadoop distributed filesystem.

Namenode

The master node for HDFS. It has locations of data blocks stored in several datanodes and often runs on the same server as the jobtracker in smaller clusters.

Datanode

Nodes for storing data blocks for HDFS. Datanodes run on the same servers as tasktrackers.

Jobtracker

The master process for scheduling MapReduce jobs. The jobtracker accepts new jobs, breaks them into map and reduce tasks, and assigns those tasks to tasktrackers in the cluster. It is responsible for job completion. It often runs on the same server as the namenode in smaller clusters.

Tasktracker

The process responsible for running map or reduce tasks from the jobtracker. Tasktrackers run on the same servers as datanodes.

Like Cassandra, Hadoop is a distributed system. The MapReduce jobtracker spreads tasks across the cluster, preferably near the data it needs. When a jobtracker initiates tasks, it looks to HDFS to provide it with information about where that data is stored. Similarly, Cassandra’s built-in Hadoop integration provides the jobtracker with data locality information so that tasks can be close to the data.

In order to achieve this data locality, Cassandra nodes must also be part of a Hadoop cluster. The namenode and jobtracker can reside on a server outside your Cassandra cluster. Cassandra nodes will need to be part of the cluster by running a tasktracker process on each node. Then, when a MapReduce job is initiated, the jobtracker can query Cassandra for locality of the data when it splits up the map and reduce tasks.

A four-node Cassandra cluster with tasktracker processes running on each Cassandra node is shown in Figure 12-1. At least one node in the cluster needs to be running the datanode process. There is a light dependency on HDFS for small amounts of data (the distributed cache), and a single datanode should suffice. External to the cluster is the server running the Hadoop namenode and jobtracker.

Four-node Cassandra cluster with an external namenode/jobtracker
Figure 12-1. Four-node Cassandra cluster with an external namenode/jobtracker

When a job is initiated from a client machine, it goes to the jobtracker. The jobtracker receives information about the data source from when the job is submitted, via the configuration options mentioned earlier. At that point, it can use Cassandra’s ColumnFamilyRecordReader and ColumnFamilySplit to get the physical locations of different segments of data in the cluster. Then, it can use that location to split up the tasks among the nodes in the cluster, preferring to run tasks on nodes where the associated data resides.

Finally, when creating jobs for MapReduce to execute (either directly or via Pig), the Hadoop configuration needs to point to the namenode/jobtracker (in the Hadoop configuration files) and the Cassandra configuration options. The cluster will be able to handle the integration from there.

Use Cases

To help you understand why Cassandra/Hadoop integration is interesting and useful, here we include a couple of use cases from out in the Cassandra community.

Raptr.com: Keith Thornhill

Raptr is a service that allows gamers to communicate and share their gaming statistics and achievements. Keith Thornhill works as a Senior Software Engineer at Raptr and saw a need to take their storage and analytics backend to the next level. Their legacy storage solution and analytics were home grown, and they were outgrowing them. Doing queries across the entire dataset was tedious and could take hours to run.

Keith saw Cassandra as a promising storage solution for the following reasons:

  • Built-in scaling instead of scaffolded on

  • Single view of read/write access (no masters or slaves)

  • A hands-off style of operations that under normal cases (node failures, adding new nodes, etc.) “just works” and requires very little micromanagement

Keith also watched as the Cassandra/Hadoop integration evolved and saw Pig as an analytics solution he could use. Initially he wanted to look for ways to use PHP or Python to use MapReduce. However, after becoming familiar with Pig, he didn’t see a need. He noted that the turnaround time from idea to execution with Pig was very quick. The query runtime was also a nice surprise. He could traverse all of the data in 10–15, minutes rather than hours. As a result, Raptr is able to explore new possibilities in analyzing their data.

As far as configuration, Keith has a separate namenode/jobtracker and installed the datanode/tasktracker on each of his Cassandra nodes. He notes that a nice side effect of this is that the analytics engine scales with the data.

Imagini: Dave Gardner

Imagini provides publishers with tools to profile all their site visitors through “visual quizzes” and an inference engine. Behind the scenes, this involves processing large amounts of behavioral data and then making the results available for real-time access.

After looking at several alternatives, Imagini went with Cassandra because of its fault tolerance, decentralized architecture (no single point of failure), and large write capacity.

Dave Gardner, a senior Imagini developer, writes, “We use Cassandra to store our real-time data, including information on roughly 100 million users, which is expected to grow substantially over the coming year. This is nearly all accessed via simple key lookup.”

Currently Imagini aggregates data from a variety of sources into Hadoop’s distributed filesystem, HDFS. Using Hadoop Streaming, they use PHP to MapReduce over their data and output directly to Cassandra via Thrift in their reducers. The results reside in Cassandra to provide real-time access to the data.

Looking forward, Imagini hopes to simplify their workflow once Hadoop Streaming becomes available with Cassandra. They’re planning on storing even raw data in Cassandra, MapReduce over that data, and then output the result into Cassandra.

Summary

In this chapter, we examined Hadoop, the open source implementation of the Google MapReduce algorithm. We took a brief look at Hadoop basics and how to integrate it with Cassandra by running the word count example that ships with Cassandra, and also saw how to use Pig, a simple and concise language to express MapReduce jobs.

Finally, we got a picture of how a few different companies are using Hadoop and Cassandra together.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset