Chapter 13

Hadoop’s Future

WHAT’S IN THIS CHAPTER?

  • Learning about current and emerging DSLs for MapReduce
  • Learning about faster and more scalable processing changes
  • Reviewing security enhancements
  • Understanding emerging trends

Hadoop is rapidly evolving. It seems that every week, news items appear telling about new Hadoop distributions being released that offer enhanced functionality, and new open source projects are released that utilize Hadoop. If you look at the JIRA enhancement requests for Hadoop at Apache (some of which were discussed in Chapter 10), you will see that the Hadoop of tomorrow will have much more functionality.

Over the past few years, new Domain Specific Languages (DSLs) have been developed for the simplification of Hadoop MapReduce programming, and this is a growth area of Hadoop — especially in the area of graph processing. Real-time Hadoop (as discussed at length in Chapter 9) is a growing trend that is here today, and something that will continue to grow in the future. As mentioned in Chapters 10 and 12, security is something that will continue to change and evolve. Although this book has touched on many of the things that will change and will continue to grow into the future, you should know about some additional areas covered in this chapter.

This chapter begins by highlighting the current trends of simplifying MapReduce programming with the use of DSLs. This approach typically shortens code development by operating on higher-level concepts suitable to particular problem domains, and by utilizing an easy-to-use API. You learn about the newer implementations of MapReduce run time introduced in Hadoop 2, which provides better scalability and performance for MapReduce execution.

In this chapter, you also learn about Tez — a new Hadoop run time that combines the power of MapReduce and Oozie into a single framework aimed at supporting general real-time implementation. This chapter also briefly highlights upcoming security changes. Finally, you learn about emerging trends in the use of Hadoop. As has been shown throughout this book, Hadoop can be used to solve many different problems. In this chapter, the focus is on growing trends of how organizations are using Hadoop now, and how organizations will be using it in the future.

Let’s start by discussing DSLs and the role they play in Hadoop programming.

SIMPLIFYING MAPREDUCE PROGRAMMING WITH DSLs

So far, this book has concentrated on MapReduce — the main Hadoop programming model allowing you to split a task’s execution across a cluster of machines. MapReduce enables the developer to fully utilize Hadoop’s power, and even customize execution (as shown in Chapter 4) to better utilize Hadoop’s capabilities. With all the power that MapReduce provides, it is a fairly low-level model, and it can often be challenging to implement for new Hadoop developers. One of the ways to simplify Hadoop development is to use DSLs developed for Hadoop.

Although entire books could be written about every DSL for Hadoop, this section gives you a quick “taste” of some of them to show how this growing area of Hadoop can lower the barriers to the learning curve of Hadoop for its users. This section highlights some DSLs that have been around for a long time (HiveQL and PigLatin, for example), and also features others that are new and emerging for Hadoop.

What Are DSLs?

A DSL is a programming language designed specifically to express solutions to problems in a specific domain. DSLs mimic the terminology and concepts used by experts in a domain. For example, the Structured Query Language (SQL) can be considered a DSL for the relational model. DSLs try to minimize the gap between the domain concepts that must be implemented and the underlying runtime system.

Some well-designed DSLs enable non-programmers to write their own “programs.” Many casual SQL users know very little about the details of the underlying structure of a relational database, yet they are capable of using SQL queries to get the information they need. Another great example of a widely used DSL is the scripting language in Microsoft Excel, called Visual Basic for Applications (VBA). Although DSLs are geared toward non-programmers, they can still be an asset for developers, because DSLs enable developers to think in the same language as the domain experts. In practice, DSLs should make programmers far more productive compared to working with ­lower-level programming constructs.

DSLs are often not Turing complete, which effectively means that they can’t be used to write arbitrarily complex algorithms in the same way as general-purpose programming languages. Instead, they are usually declarative, where the user expresses the desired outcome, and the implementation decides how to achieve that outcome. In SQL, for example, you declare the schemas of your data in tables and what operations to perform on that data through queries. In most relational databases, the runtime system decides how to store the data and how to satisfy your queries.

DSLs are also categorized by whether they are external or internal:

  • An external DSL is implemented using the same tools used for other programming languages. A unique grammar is defined for an external DSL, and a custom compiler is written to parse programs in the language.
  • An internal DSL (sometimes called an embedded DSL) is “hosted” in another, more general-purpose programming language (or DSL), meaning that it uses a stylized form of the host language’s syntax, rather than having a unique grammar of its own.

Early adopters of Hadoop started inventing DSLs rather quickly. You may have heard of some of them — Hive, Pig, Clojure-Hadoop, Grumpy (Groovy Hadoop), Sawzall, Scoobi, Lingual, Pattern, Crunch, Scrunch — and the list keeps growing every day.

DSLs for Hadoop

Hadoop DSLs typically fall into several main categories:

  • SQL-based DSLs — DSLs based on SQL (which may be loosely based on SQL and are “SQL-like”) are most useable to non-programmers who have a database background. Using these DSLs, people who “think” in database language can accomplish data analytics tasks without having to think about MapReduce.
  • Data flow DSLs — These DSLs expose the metaphor of data pipelines to filter, transform, and aggregate data as it flows through the pipeline.
  • Problem-specific programming languages — These DSLs focus on a certain problem domain, and sometimes use different models for processing data. Graph processing is an example that models data as a graph (for example, friend connections in social networks), and performs computations over the graph.

Hive and SQL-based DSLs

You may already be familiar with Hive, which utilizes HiveQL, one of the first SQL-like DSLs on Hadoop. This is just one example of a SQL-oriented DSL that makes MapReduce easy to use for non-programmers. A SQL-like query tool for data on HDFS, it enables users to define tables with schemas for the data, and write queries that are implemented internally using MapReduce. Hive is not a relational database management system (RDBMS), because it has no concept of transactions or record-level CRUD (Create, Read, Update, and Delete). But it does provides a language (called HiveQL) that is easy for database users to understand. It puts the emphasis on querying — asking questions about data, and performing aggregations.

Although users new to Hadoop might be tempted to use Hive as a relational database, it is important to know that HiveQL commands are translated into MapReduce batch jobs. This makes Hive unsuitable for queries that need to be fast (although, there is work underway to make Hive much faster by decoupling it from MapReduce, as discussed later in this chapter). Hive was never meant to be a replacement for an enterprise data warehouse, but as a way to simplify working with the sets of data so that a person doesn’t have to be a Java developer to process data sets and gain value from the data.

Facebook invented Hive and open-sourced it as an Apache project in 2008. Facebook’s data analysts needed user-friendly, productive tools to work with the data Facebook was accumulating in Hadoop clusters. Because SQL knowledge was so widespread, a SQL-based tool was a logical choice. Hive is perhaps the most important tool driving Hadoop adoption, because it typically provides the lowest entry barrier for Hadoop newcomers.

Hive uses an external DSL (as classified previously). HiveQL has its own grammar, compiler, and run time. Most Hive queries are implemented as MapReduce jobs, but data definition language (DDL) statements that are used to create and modify databases, tables, and views don’t require MapReduce. Hive stores this metadata information in a separate relational database (such as MySQL). Most queries trigger one or more MapReduce jobs to read and process the data in HDFS or another data store, via Hive’s plug-in support for different data formats.

Let’s explore the concept of Hive as a DSL with an example of server log data ingested into HDFS and partitioned by the year, month, and day. All the specific details of what’s happening won’t be addressed here, but just the highlights that illustrate the value of a powerful DSL. Listing 13-1 provides an example DDL table definition for server log data.

LISTING 13-1: Logs table definition

CREATE TABLE logs (
  severity   STRING,    -- e.g., FATAL, ERROR, ...
  server     STRING,    -- DNS name for the server
  processid  SMALLINT,  -- 2-byte integer
  message    STRING,    -- text of the actual message
  hour       TINYINT,   -- hour from the timestamp
  min        TINYINT,   -- minute from ...
  sec        TINYINT,   -- second ...
  msec       INT)       -- microseconds ...
PARTITIONED BY (        -- Also considered columns:
  year     SMALLINT,    -- year from the timestamp
  month    TINYINT,     -- month ...
  day      TINYINT)     -- day ...
STORED AS SEQUENCEFILE; -- file format

The Table definition shown in Listing 13-1 is comprised of three main parts. The first part contains field definitions and their types (similar to an ordinary database table). The second part is specific to Hive, and specifies data partitioning. The data partitioning statement in Listing 13-1 says that the table will be comprised of several files — one for every day of logs. Finally, the third part of the table definition in Listing 13-1 specifies that every partition is stored as a separate sequence file.

Hive organizes the partitioned data into separate directories. If the “warehouse” directory is configured to be a warehouse in HDFS, then the directory structure for this partitioned table looks like what is shown in Listing 13-2.

LISTING 13-2: Partitions directory

...
/warehouse/logs/year=2013/month=4/day=13
/warehouse/logs/year=2013/month=4/day=14
/warehouse/logs/year=2013/month=4/day=15
...

As shown here, all the data for April 13 will be in the first directory shown. Now, consider an example query (Listing 13-3), which looks at what happened between noon and 1 p.m. on April 13.

LISTING 13-3: Example query

SELECT hour, min, sec, msec, severity, message
FROM logs
WHERE year = 2013 AND month = 4 AND day = 13 AND
      hour >= 12 AND hour <= 13
ORDER BY severity DESC, hour, min, sec, msec;

The HiveQL statement in Listing 13-3 should be fairly intuitive to anyone familiar with SQL. In contrast, writing this query in the Java MapReduce API is challenging, because implementation of the ORDER BY clause might require knowledge of the specialized programming idioms.

From this example, note that queries of logs will almost always be range-bound, as shown here with the WHERE clause that bounds the range of timestamps of interest. Because the data is already partitioned by year, month, and day, Hive knows it only needs to scan the files in the subset of directories (the one for the April 13, in this example), thus providing relatively fast results, even over a logs data set that could contain terabytes of data covering many years.

HiveQL does several essential things that a good DSL must do:

  • It provides a concise, declarative way to state the structure of the information and how to work with it.
  • The language is intuitive to domain experts (that is, data analysts). Hive hides the complexity of implementing storage and queries.
  • It makes it easy to specify data organization hints (partitioning by timestamp, in this case) that improve query speeds.
  • HiveQL imposes relatively little overhead compared to handwritten Java MapReduce code.
  • It provides extension hooks, enabling you to plug in different formats and capabilities.

Hive enables you to extend its capabilities in many different ways:

  • It allows you to specify different input formats and output formats.
  • It allows you to use a custom serializer and deserializer (known as SerDe) for different file formats.
  • It allows you to create user-defined functions (UDFs) that can be written in Java and called from a HiveQL statement.
  • It also allows you to create custom mappers and reducers that you can put into your query.

Let’s look at some examples of how Hive can be extended. If you look back at the DDL statement shown in Listing 13-1, it instructs Hive to store the data in the sequence file, and, by default, this uses the SequenceFile input format. For flexibility and extensibility, Hive enables a developer to easily plug in his or her own input format, which allows the reading of records that are stored in various formats and proprietary files. It also enables you to plug in your own output format, which formats query results. This hook has been used to integrate Hive with data stored in HBase, Cassandra, and other data stores.

As mentioned, Hive also enables you to support unique record formats by specifying a SerDe (serializer/deserializer) that knows how to parse an input record into columns, and optionally write output columns in the same format. Note that Hive makes a clean distinction between the input and output formats that understand how records are stored in files (or byte streams in general), and the SerDe understands how each record is parsed into columns.

A popular example of a record format is JavaScript Object Notation (JSON). Listing 13-4 provides a modification to the previous logs DDL statement for the data stored as JSON in plaintext files, where each JSON document is stored on a single line in the file.

LISTING 13-4: Table definition with JSON records

CREATE TABLE logs ( -- the 
  severity   STRING,
  ...,
  hour       TINYINT,
  ...)
PARTITIONED BY (...) 
STORED AS ROW FORMAT SERDE 
  'org.apache.hadoop.hive.contrib.serde2.JsonSerde'
WITH SERDEPROPERTIES (
  "severity"="$.severity",
  ...,
  "hour"="$.timestamp.hour",
  ...);

In this table definition example, a JsonSerde class implements the Hive SerDe API, which is an important Hive feature that enables you to instruct Hive exactly how to process the record. In this example, Hive will call the JSON SerDe to parse each JSON record into columns using the mapping defined in the SERDEPROPERTIES specified in the table definition. SERDEPROPERTIES is a Hive feature that passes the specified key-value definitions to the specified SerDe interface. In this case, the $ references the JSON document, so $.timestamp.hour means “use the hour field inside the timestamp object inside the record,” which will be used as the hour “column.”

Finally, Hive also supports UDFs to extend the available operations on individual records and columns, or aggregations over them. With UDFs, you can write a Java function and it can be evaluated by a HiveQL statement. This can be helpful when Hive itself doesn’t provide certain functionality that you need. For example, Hive does not provide “windowing” functions for doing aggregations over a moving “window” of the records — something that most RDBMS SQLs provide. A classic calculation used by stock traders is the moving average of a stock’s closing price over a number of days, which reveals current and emerging trends more clearly. This is something that can be provided by a custom UDF.

Hive represents a great example of an Hadoop DSL that hides the complexity of MapReduce, and provides a widely understood language for working with data in Hadoop. For most scenarios, it imposes minimal overhead over handwritten Java MapReduce code, and it provides extension points for custom code to address most requirements not already covered by Hive. This approach abstracts the concepts of MapReduce from database engineers, enabling them to focus on their data problems and not programming.

Other SQL-based DSLs usage exists. Specialized Hadoop real-time query engines discussed in Chapter 9 (Cloudera Impala and Apache Drill) use SQL-based DSLs as their programming language.

Data Flow and Related DSLs

Data flow DSLs enable developers to represent data processing of large data sets in the form of pipelines comprised of filtering, transforming, and aggregating components. The best known example of such DSLs is used by Pig — a tool that is bundled with most Hadoop distributions. Pig uses a procedural data flow language that abstracts the MapReduce framework, but uses MapReduce underneath the covers.

Pig

Whereas Facebook invented Hive to enable analysts to work with Hadoop data using a familiar SQL DSL, Yahoo! invented Pig to make it easier to transform, filter, and aggregate data. Pig has a custom language called PigLatin that is not SQL-based, which means the learning curve is higher for experienced SQL users, and is best suited for developers.

Although Hive was mainly introduced for querying data, Pig was initially introduced for Extract, Transform, and Load (ETL) processing. With PigLatin, developers can specify how the data is loaded and where to checkpoint data in the pipeline, and it is highly customizable. However, the features of Pig and Hive overlap so much that you can actually use both languages for querying and ETL.

To demonstrate the common functionality between Pig and Hive let’s consider an example query that groups daily stock records for AAPL (Apple) by year, and then averages over the closing price, suggesting how AAPL’s stock trends year-over-year. Listing 13-5 shows a Hive query for this example. It’s fairly straightforward SQL.

LISTING 13-5: Hive implementation of Apple query

SELECT year(s. YYYY-MM-DD), avg(s.close) 
FROM stocks s 
WHERE s.symbol = 'AAPL'
GROUP BY year(s. YYYY-MM-DD);

Unlike Hive, Pig does not have a DDL. As a result, the equivalent Pig version shown in Listing 13-6 starts by reading the data from a tab-delimited file.

LISTING 13-6: Pig implementation of Apple query

aapl = load '/path/to/AAPL.tsv' as (
YYYY-MM-DD:       chararray,
  ...,
  close:     float,
  ...);
by_year = group aapl by SUBSTRING(YYYY-MM-DD, 0, 4);
year_avg = foreach by_year generate 
             group, AVG(aapl.close);
-- Dump to console:
dump year_avg;

In Listing 13-6, you can see the familiar GROUP BY operation from SQL, while foreach a generate b is the projection operator, equivalent to SELECT b FROM a in SQL. Note that when you grouped over aapl, generating a new relation called by_year, Pig named the first field group, which contains the year values over which you grouped. Pig named the second field aapl (the name of the relation you grouped over), which holds the grouped records. So, the expression foreach by_year generate group, AVG(aapl.close) really means, “iterate over the records in by_year, and project the year (group field) and the average of the group for each year.

Pig is informally described as a data flow language, because you define a sequence of statements that describe each step in the processing of the data, from its source and original schema to the final output.

Listing 13-7 shows an example of the Word Count program implementation, written in Pig.

LISTING 13-7: Pig implementation of Word Count

inpt = LOAD '/path/to/input' using TextLoader  AS (line:chararray);
words = FOREACH inpt GENERATE flatten(TOKENIZE(line)) AS word;
grpd = GROUP words BY word;
cntd = FOREACH grpd GENERATE group, COUNT(words); 
STORE cntd INTO '/path/to/output';

It looks like this implementation contains a sequence of "variable = value" statements, but in reality, each line defines a relation (in the relational model sense of the word) on the right-hand side, and an alias (or name) for the relation on the left-hand side.

This implementation first loads the data, treating each line of text as a record with a single field named line of type chararray (that is, a string) in the schema for input. Next, it iterates over each record, tokenizing the text into words and “flattening” the words into individual records. Then, it groups the words, so that each group contains all the occurrences for a given word. Finally, it projects out each word (the field holding the word is now named group) and the size of each group (the field holding the group’s contents is given the name of the grouped-over relation, namely words). You store the results to an output path.

Pig is a natural fit for data flows like this, but it also has all the conventional relational operations that Hive contains (with some exceptions), including GROUP BY, JOIN, PROJECTION, FILTERING (that is, WHERE clauses), limiting, and so on.

Like Hive, Pig is an external DSL with its own grammar and compiler. It is also not a Turing-complete language — so, for example, general looping is not supported (apart from iteration through the records). Also, like Hive, Pig supports plug-in file formats and UDFs, but Pig supports writing UDFs in several languages, including Python, Ruby, and JavaScript, in addition to Java.

Pig’s main benefit over Hive is the greater flexibility of the step-by-step specification of the data flow. Whereas database users prefer Hive, programmers often prefer Pig because it looks and feels more like a conventional programming language.

Now let’s turn to DSLs based on a Java virtual machine (JVM) that expose higher-level abstractions than Hadoop’s Java API for MapReduce. Note that a JVM is being used here rather than Java, because, as you’ll see, some DSLs use other languages running on the JVM besides Java.

Cascading and Scalding

Cascading is the most popular Java DSL on top of the low-level MapReduce API. It was introduced in late 2007 as a DSL to implement functional programming for large-scale data workflows. Cascading provides a Turing-complete, internal or embedded DSL for MapReduce programming, with explicit metaphors for sequencing pipes together into data flows. Cascading hides many of the details of the lower-level API, enabling the developer to focus on the problem at hand.

Cascading is based on a “plumbing” metaphor to assemble pipelines to split, merge, and join streams of data, performing operations on them. With Cascading, a data record is called a tuple, a pipeline is called a pipe assembly, and the records going through the pipeline is called a tuple stream. Using the plumbing analogy, Cascading defines workflows out of familiar plumbing elements, such as pipes, taps, and traps.

  • A pipe connects the main elements of the workflow (or pipeline), and defines what work should be done against the tuple stream passing through it. Pipes consist of types Each (applying a function or filter), GroupBy (which groups streams on tuple fields), CoGroup (which joins a common set of values), Every (which applies an aggregator or sliding window operation), and SubAssembly (which combines other pipe assemblies).
  • A tap represents a resource, or a connection to a physical data source in a data flow. A source tap is typically the input tap (where you are reading data from), and a sink tap is the output tap (where you are writing data).
  • A trap is like a sink tap — it is a place to write data that results in a failed operation, enabling you to continue processing your data without losing track of the data that caused a fault.

Figure 13-1 shows an example of a Cascading pipeline, representing the familiar Word Count example.

FIGURE 13-1: Word Count workflow in Cascading

image

The workflow in Figure 13-1 has two taps — an input tap (receiving a collection of documents) and an output tap (which produces word counts). The pipeline also has two functions — a tokenize and a count function (which is an aggregator), and the workflow has a GroupBy pipe assembly. Listing 13-8 shows a version of a Word Count implementation written using Cascading.

LISTING 13-8: Cascading implementation of Word Count

import org.cascading.*;
// other imports...
public class WordCount {
  public static void main(String[] args) {
    // Set up app properties.
    Properties properties = new Properties();
    FlowConnector.setApplicationJarClass(properties, WordCount.class);
 
    // Define the input and output "taps".
    Scheme sourceScheme = new TextLine(new Fields("line"));
    Scheme sinkScheme = new TextLine(new Fields("word", "count"));
    String inputPath  = args[0];
    String outputPath = args[1];
    Tap source = new Hfs(sourceScheme, inputPath);
    Tap sink   = new Hfs(sinkScheme, outputPath, SinkMode.REPLACE);
 
    // Connect the pipes for the data flow.
    Pipe assembly = new Pipe("wordcount");
    // Regular expression to tokenize into words.
    String regex = "(?<!pL)(?=pL)[^ ]*(?<=pL)(?!pL)";
    Function function = new RegexGenerator(new Fields("word"), regex);
    assembly = new Each(assembly, new Fields("line"), function);
    assembly = new GroupBy( assembly, new Fields("word"));
    Aggregator count = new Count(new Fields("count"));
    assembly = new Every(assembly, count);
 
    FlowConnector flowConnector = new FlowConnector( properties );
    Flow flow = flowConnector.connect( "word-count", source, sink, assembly);
    flow.complete();
  }
} 

The code begins by setting up the application properties and source. It then sinks “taps.” The pipes are joined together to create the data flow. Note that where SQL has keywords for operations like group by and projection, and functions for counting, Cascading encapsulates these operations as Java classes. Like the previous Pig script (Listing 13-7), you loop over “each” line, tokenizing into words (using a regular expression this time), then grouping over the words, counting the group sizes, and finally writing the results to the output tap.

This example shows how Cascading focuses on the relational operations required for the algorithm. There is much less framework boilerplate here than you have seen in the typical MapReduce “word count” examples that are used to demonstrate how MapReduce works.


NOTE For an example of a much more complex data flow, see the “CMU Workshop on CoPA (Cascading + City of Palo Alto Open Data),” available at https://github.com/Cascading/CoPA/wiki, which shows how to use Cascading and Hadoop to clean up raw, unstructured open data for parks, streets, and tree data. Workshop suggests several applications that could be built with this data, and provides an example application.

Although Cascading is a Java API, APIs are now available for other languages that use Cascading. The list includes Scalding for Scala, Cascalog for Clojure, PyCascading for Python, and others. Some of these APIs add enhancements not found in the Java API. For example, Cascalog adds a logic-based query capability inspired by Datalog, while Scalding adds math libraries that are useful for graph-traversal problems and many machine-learning algorithms.

Listing 13-9 shows a version of Word Count written in Scalding.

LISTING 13-9: Scalding implementation of Word Count

import com.twitter.scalding._
class WordCountJob(args: Args) extends Job(args) {
  TextLine(args("input"))
    .read
    .flatMap('line -> 'word) {
      line: String => 
          line.trim.toLowerCase.split("W+") 
    }
    .groupBy('word) { group => group.size('count) }
  }
  .write(Tsv(args("output"))) // tab-delim. output
}

As the import suggests in Listing 13-9, Scalding was developed at Twitter. Without explaining all the details of Scala syntax, let’s note a few important things.

First, the code is terse — and dramatically smaller than the Java counterparts. With some exceptions, this code looks like typical Scala code for working with the built-in API for smaller, in-memory data collections.

Next, the relational operations (like grouping) are now function calls, not classes. The line .groupBy('word) { group => group.size('count) means call the groupBy function on the output of the previous function (pipeline step). Grouping, in this case, is done over the field-named word. Additionally, an anonymous function is passed to groupBy that takes each group as an argument and returns the size of the group, labeling that value as a field-named count. The schema of the data output from this step (and written in tab-delimited format to the output) contains each word and its count.

What does flatMap do in Listing 13-9? It represents the map phase of a MapReduce job. In mathematics, mapping is actually always one-to-one, meaning one output element for each input element. MapReduce relaxes this constraint, allowing zero-to-many output elements per input element. This is exactly what flatMap actually means, too. The anonymous function passed to flatMap outputs a collection of zero-to-many elements for each input element from the original collection, and then flatMap “flattens” the nested collections into one “flat” collection. Hence, you have really been working with FlatmapReduce all along.

Crunch and Scrunch

Another MapReduce DSL for MapReduce is called Crunch, and was modeled after Google’s FlumeJava, using a number of small primitive operations for working on pipelines of enormous amounts of data. Crunch is based on three data abstractions: PCollection<T> (for parallelized collections of data having a certain type T), PTable<K,V> (for parallelized tables of key-value data of types K and V, respectively), and PGroupedTable<K,V> (for the output of group-by operations). Parallel implementations of operations for working with records in these structures across a cluster also exist.

Listing 13-10 shows a Word Count implementation written in Crunch.

LISTING 13-10: Crunch implementation of Word Count

// import statements...
public class WordCount {
  public static void main(String[] args) throws Exception {
    // Setup the pipeline and the input.
    Pipeline pipeline = new MRPipeline(WordCount.class);
    PCollection<String> lines = 
      pipeline.readTextFile(args[0]);
 
     // Tokenize the lines into words in parallel.
    PCollection<String> words = lines.parallelDo(
      "my splitter", new DoFn<String, String>() {
      public void process(
        String line, Emitter<String> emitter) {
        for (String word : line.split("s+")) {
          emitter.emit(word);
        }
      }
    }, Writables.strings());
 
    // Count the words.
    PTable<String, Long> counts = Aggregate.count(words);
 
    pipeline.writeTextFile(counts, args[1]);
    pipeline.run();
  }
}

As you can see from the example in Listing 13-10, Crunch enables the developer to explicitly use more of the low-level Java Hadoop APIs (see, for example, the use of Writables). It also provides the capability to write data using the Avro type. It is simple to use, and for developers who know Java, it is easy to learn.

The Scala version of Crunch is called Scrunch. Listing 13-11 shows a Word Count implementation in Scrunch.

LISTING 13-11: Scrunch implementation of Word Count

// imports...
class WordCountExample {
  val pipeline = new Pipeline[WordCountExample]
 
  def wordCount(fileName: String) = {
    pipeline.read(from.textFile(fileName))
      .flatMap(_.toLowerCase.split("W+"))
      .filter(!_.isEmpty())
      .count
  }
}

Listing 13-11 shows the same sort of elegant, concise expression of the data flow as it was for the Scalding DSL. The group by example shown for Cascading and Scalding would be similar in size and complexity.

As mentioned, Crunch and Scrunch place greater emphasis on statically typing the fields in the schemas, compared to Cascading. It’s not clear that either static or dynamic typing offers demonstrable advantages over the other, but the differences might influence your tool selection, based on your own preferences.

Graph Processing

The final DSL category to discuss is less widely used today, but you will see more of these over time — DSLs that model data as a graph and run parallel algorithms over the graph. The number of use cases is huge.

Online social graphs are rapidly growing in size, and there is a growing demand to analyze them. Online social networking sites such as Facebook, LinkedIn, Google+, and Twitter, and even e-mail sites like Yahoo! and Google, have hundreds of millions of users, and analyzing people and their connections plays a big role in advertising and personalization. Google was one of the first to capitalize on the importance of analyzing graphs, using an algorithm called PageRank for their search engine. Invented by Larry Page and Sergey Brin (the founders of Google), the algorithm orders search results by “link popularity” — the more sites that are linked to a web page, the more relevant the result. Although one of the many factors in determining relevance rankings, the algorithm is used in all of Google’s web search tools.

A lot of other problems require link, graph, and network analysis, and today, Facebook seems to be leading the way for Hadoop in this area. Using graph-based DSLs will be a growing area for Hadoop in the future.

So far, graph processing systems for Hadoop are fairly new because scalable graph processing across a cluster of computers is closer to the bleeding edge of research, where a number of problems such as the following are the subject of active investigation:

  • How do you partition a potentially dense graph across a cluster?
  • How can you cut the graph for distribution across the cluster so that the number of arcs bridging machines is minimized?
  • How do you communicate updates efficiently across those links that span machines?

There is currently much active work and a growing amount of interest in the practical applications of graph processing in Hadoop. This section just briefly mentions the approaches and projects in this sector of the “DSL space,” and you should investigate further on your own.

As usual, Google research papers are leading the way, and Apache is following. Pregel (http://googleresearch.blogspot.com/2009/06/large-scale-graph-computing-at-google.html) is a large-scale graph system in use at Google that is used for graph data analysis. One of Apache’s projects, Giraph (which is discussed later in this section), is the open source counterpart to Pregel that uses Hadoop. It is used by Facebook to analyze social graphs formed by users, and their various connections with friends and groups.

Graph processing systems such as Pregel and Giraph are based on a parallel processing model called Bulk Synchronous Parallel (BSP), where communications between graph nodes are processed currently in lock-step. To demonstrate how BSP works, at time t0, all nodes send messages to other connected nodes at the same time. All nodes update their states, as needed, followed by another round of messages at time t1, and so on. Barrier synchronization occurs after each sending of a message.

Based on this concurrent communications model, you might think that MapReduce would be a poor fit for this highly iterative model — and you would be correct. Natively implementing BSP in MapReduce alone would require a separate MapReduce job per iteration of BSP with terrible overhead!

However, graph processing systems are beginning to utilize Hadoop’s data storage and some MapReduce operations in parallel with their own BSP calculations. A number of graph processing systems exist, but let’s focus on two open source systems that work with Hadoop in one way or another:

  • Giraph is an Apache project that is an implementation of Google’s Pregel, and implements BSP on top of HDFS. In Giraph, you launch a conventional MapReduce job, but Giraph internally handles the iteration steps using the context of a Vertex, keeping the graph state in memory without chaining together inefficient MapReduce jobs. Giraph leverages the resource management infrastructure of Hadoop and HDFS for data storage, but it works around the inefficiencies of using MapReduce for BSP. Giraph also incorporates Zookeeper to provide a fault-tolerant, centralized coordination service.
  • Hama is also an Apache project. Like Giraph, it is a BSP computing framework that runs over the top of HDFS. However, Hama bypasses MapReduce altogether, spinning up its own set of computing processes across the cluster. Hama avoids the limitations of fitting within the Hadoop resource scheduling approach and MapReduce job model. But organizations may be reluctant to introduce an additional set of cluster processes that compete with MapReduce for resources, rather than attempting to work within that framework. For that reason, clusters doing this type of analysis are typically dedicated to the use of Hama.

There is definitely future growth potential for graph DSLs using Hadoop.

This brief summary of DSLs for Hadoop demonstrates that, in addition to the base MapReduce framework, there is a rich set of DSLs available to write Hadoop jobs more productively using tools that best suit the needs of the user. New Hadoop DSLs are created on a regular basis. Cascading, for example, just introduced two new DSLs:

  • Lingual — A new SQL DSL
  • Pattern — A new machine-learning DSL

Now that you know how DSLs enable you to simplify MapReduce usage, the next section takes a look at advances of MapReduce itself, allowing it to utilize Hadoop resources more efficiently.

FASTER, MORE SCALABLE PROCESSING

As discussed throughout this book, the MapReduce infrastructure implementation is effectively a monolithic one, and a major component of this implementation is the JobTracker (see Chapter 3 for more details), which is responsible for both resource management and job scheduling and monitoring. Among other things, implementers have found that the JobTracker implementation is very complex and can suffer from some drawbacks, including increased memory consumption, a rigid threading model, and problems related to scalability, reliability, and performance.

As a result of this analysis, the Hadoop community did a complete overhaul of MapReduce — sometimes called “NextGen MapReduce,” “MapReduce 2.0” (MRv2), or Yet Another Resource Negotiator (YARN), which is covered in this section. After this overhaul was introduced, a new project called “Tez” (which is Hindi for “speed”) was introduced into Apache Incubation, and it promises to dramatically speed performance in Hadoop. You learn more about both of these later in this chapter.


NOTE It is important to note that although what is described in the next section is sometimes called “MapReduce 2,” the changes described do not change the actual MapReduce programming model, or the APIs that developers use as described throughout this book!

Apache YARN

The Hadoop developer committee decided that the solution for some deficiencies in the original MapReduce was to split up resource management and job scheduling into separate daemons. A new global resource manager called YARN splits the functionality of a JobTracker into two separate daemons:

  • A global Resource Manager (RM) that consists of a Scheduler and an Applications Manager.
  • An Application Master (AM) that provides support for a specific application, where an application is either a single job in the classical sense of MapReduce jobs, or a Directed Acyclic Graph (DAG) of jobs. (Compare this to Oozie, described in Chapters 6 through 8.)

Splitting up the functionality of the JobTracker provides more flexibility and better performance.

YARN’s resource management is based on a very general resource model for applications. Resources are organized in the containers that provide a specific number of resources (memory, CPU, and so on). As with everything else in Hadoop, YARN’s resource management and execution framework is implemented by leveraging the master/slave architecture. Slaves, or Node Managers (NMs), run on every node. They manage containers on a specific node, they monitor a node’s execution, and they report resource availability to the Master — called the Resource Manager. The Master is responsible for arbitrating resources among all the applications in the system. (Compare this to the HDFS architecture described in Chapter 2.)

Execution of the specific application is controlled by the Application Master. An Application Master is fully distributed — there is an instance of an Application Master running on every node. The Application Master is responsible for splitting an application into multiple tasks and negotiating with the Resource Manager for execution resources (containers). Once resources are allocated, the Application Master interacts with the Node Manager(s) to place, execute, and monitor an individual application’s tasks.

The overall application flow is shown in Figure 13-2.

FIGURE 13-2: YARN architecture

image

It contains the following steps:

1. A client program submits the application, including the necessary specifications to launch the application-specific Application Master. As part of the application submission, the client must provide sufficient information to the Resource Manager to launch the application’s first container — the Application Master. The required information (application submission context) includes local files/jars that must be available for an application to run, the actual command that must be executed (with the necessary command-line arguments), any UNIX environment settings (optional), and so on. Effectively, it is necessary to describe the UNIX process(es) that must be launched by an Application Master.
2. The Resource Manager allocates a required container for an Application Master, and then launches the Application Master.
3. During the startup, the Application Master registers with the Resource Manager. This allows the client program to query the Resource Manager for Application Master details, including its address. After getting these details, a client can directly communicate with its own Application Master.
4. Once the Application Master is up and running, it examines an application request and negotiates appropriate resource containers required for application execution.
5. Once the required container(s) is allocated, the Application Master launches it by providing the container launch specification to the Node Manager.
6. During execution, application code provides necessary information (progress, status, and so on) to its Application Master. This information is also available to the client, which communicates directly with the Application Master.
7. Once the application is complete, the Application Master releases resources, deregisters itself, and shuts down, releasing its own container.

The important thing to notice about YARN is that it does not change the actual MapReduce programming model (the name MapReduce 2, used for YARN, is an unfortunate and misleading name) or the APIs that are used by developers. It simply provides a new resource management model and implementation that is used to execute MapReduce jobs. As a result, in the most simplistic case, existing MapReduce applications will work as is, but will require recompiling.

YARN can be used for the creation of new frameworks and execution models (in addition to MapReduce) that can leverage both the compute power of an Hadoop cluster and its rich data storage models to solve specific new classes of problems. Such new frameworks can leverage YARN resource management, but provide a new implementation of the Application Manager. As of this writing, the following projects have either ported or are in the process of porting their implementation to YARN:

  • Spark (an open source cluster computing system)
  • Apache Hama (the graph analytic framework described earlier in this chapter)
  • Apache Giraph (the graph analysis framework described earlier in this chapter)
  • Open MPI (an open source project for high-performance computing)
  • Storm (an open source, distributed, real-time computation system described in Chapter 9)
  • Apache S4 (an Apache project similar to Storm that provides real-time events processing)

The YARN architecture allows for the coexistence of multiple Application Managers sharing the same Hadoop cluster, as well as the data residing on this cluster. This simplifies data-level integration between multiple frameworks residing on the same Hadoop cluster.

Executing MapReduce applications using YARN improves scalability, but the MapReduce programming model does not fully utilize YARN capabilities, especially the built-in DAG support. The use of MapReduce is typically supplemented by Oozie for the orchestration of individual MapReduce jobs. Although this approach works well for batch applications, it suffers from the overhead of passing data through HDFS and the application’s start-up time for real-time Hadoop applications.

Some of these shortcomings are eliminated in the new Hadoop execution framework called Tez.

Tez

Tez provides a general-purpose, highly customizable framework, which natively supports the orchestration of individual jobs into DAG. Tez doesn’t just execute resources for individual MapReduce jobs, but does so for the whole graph of jobs, resulting in much faster performance than Oozie-orchestrated MapReduce jobs. The faster performance using Tez is achieved by the elimination of the overhead involved with launching multiple jobs, and it can meet requirements for human-interactive response times and high throughput at petabyte scale. (Compare this to the definition of real-time processing provided in Chapter 9.)

Originally created in support of the Stinger initiative, with the goal of making Apache Hive 100 times faster, Tez provides a single underlying framework to support both latency and throughput-sensitive applications. Consequently, it eliminates the necessity for multiple frameworks and systems to be installed, maintained, and supported, thus providing significant cost savings for the enterprise.

Tez was contributed to Apache in early 2013 by Hortonworks, and entered the Incubation stage. It is a very active project with many involved developers working on issues. Tez has a bright future for real-time applications for Hadoop.

SECURITY ENHANCEMENTS

As described in Chapter 10, the Hadoop community is working on many security enhancements. With the addition of new cryptographic codecs, a new token-based authentication mechanism that supports more authentication protocols, a unified authorization system that supports Attribute Based Access Control (ABAC) and supports policy enforcement using open standards and XACML, and changes to HBase to allow cell-level authorization, Hadoop will be able to move from isolated cluster environments with perimeter-level security to very secure deployments that can meet the requirements of highly secure environments.


NOTE For more information, see the “Security Enhancements with Project Rhino” section in Chapter 10.

EMERGING TRENDS

This book has certainly covered many trends in Hadoop. Though the authors certainly don’t have a crystal ball, they do see the following as key areas that will grow in the future:

  • Real-time Hadoop — This trend is obviously here today, and it will continue. As mindsets shift from viewing Hadoop as a batch-mode processing system, Hadoop’s use in the future (especially with some of the recent and emerging performance and scalability improvements) will be real-time analytics with human-response times. Jobs that you may be accustomed to taking a long time will be capable of being performed rapidly. This will play out on several different fronts — fraud detection and analysis for transactions, security vulnerability analysis and anomaly detection for real-time events, and on-demand analytics processing with other tools in the Hadoop ecosystem.
  • Graph analytics and new algorithms beyond MapReduce — If you follow the brief history of Hadoop’s beginnings, you can certainly see Google’s influence. Google’s use of new graph algorithms for highly scalable and distributed graph analysis has triggered more interest in distributed algorithms other than MapReduce. Because Apache Giraph (discussed earlier in this chapter) is an open source implementation of Google’s high-performance graph analytics platform (Pregel), and because Facebook is using Giraph for graph analytics on its social network, there is no question that this will be a major growth area for Hadoop. Apache Hama (also discussed earlier in this chapter), utilizes HDFS storage, but uses other graph algorithms with Hadoop. This trend will continue.
  • Machine learning — Although you didn’t read much about it in this book, this is a growing topic. With projects like Apache Mahout and Pattern, a Cascading DSL for machine learning, predictive modeling and machine learning will be used more and more with Hadoop for common use cases like recommendations, fraud detection, and security vulnerability detections.
  • Higher-level abstractions and DSLs — Earlier in this chapter, you learned about the power of DSLs for Hadoop, and how they simplify programming. The learning curve for using MapReduce and Hadoop is greatly reduced with the use of such languages and tools, and this trend will continue to grow. Although there can certainly be a performance penalty related to the use of some of these tools, as Hadoop’s processing gets faster, more data scientists and domain experts will be able to perform data analytics with tools focused specifically to their domains, removing the barrier to entry. It could be that this will be so common that data scientists might not even know that they are using Hadoop!

Hadoop is evolving quickly and has a bright future. With the ongoing enhancements shown in the proliferation of new projects, performance improvements, security, and DSLs, new directions are quickly approaching, and it is an exciting time to be an Hadoop developer!

SUMMARY

This chapter highlighted the growing trend of using DSLs to simplify MapReduce programming. You learned about YARN and Tez, which will drastically improve the scalability and performance of Hadoop. You also learned about upcoming security changes, and emerging trends seen in Hadoop’s future.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset