Chapter 22. HCatalog

Alan Gates

Introduction

Using Hive for data processing on Hadoop has several nice features beyond the ability to use an SQL-like language. It’s ability to store metadata means that users do not need to remember the schema of the data. It also means they do not need to know where the data is stored, or what format it is stored in. This decouples data producers, data consumers, and data administrators. Data producers can add a new column to the data without breaking their consumers’ data-reading applications. Administrators can relocate data to change the format it is stored in without requiring changes on the part of the producers or consumers.

The majority of heavy Hadoop users do not use a single tool for data production and consumption. Often, users will begin with a single tool: Hive, Pig, MapReduce, or another tool. As their use of Hadoop deepens they will discover that the tool they chose is not optimal for the new tasks they are taking on. Users who start with analytics queries with Hive discover they would like to use Pig for ETL processing or constructing their data models. Users who start with Pig discover they would like to use Hive for analytics type queries.

While tools such as Pig and MapReduce do not require metadata, they can benefit from it when it is present. Sharing a metadata store also enables users across tools to share data more easily. A workflow where data is loaded and normalized using MapReduce or Pig and then analyzed via Hive is very common. When all these tools share one metastore, users of each tool have immediate access to data created with another tool. No loading or transfer steps are required.

HCatalog exists to fulfill these requirements. It makes the Hive metastore available to users of other tools on Hadoop. It provides connectors for MapReduce and Pig so that users of those tools can read data from and write data to Hive’s warehouse. It has a command-line tool for users who do not use Hive to operate on the metastore with Hive DDL statements. It also provides a notification service so that workflow tools, such as Oozie, can be notified when new data becomes available in the warehouse.

HCatalog is a separate Apache project from Hive, and is part of the Apache Incubator. The Incubator is where most Apache projects start. It helps those involved with the project build a community around the project and learn the way Apache software is developed. As of this writing, the most recent version is HCatalog 0.4.0-incubating. This version works with Hive 0.9, Hadoop 1.0, and Pig 0.9.2.

MapReduce

Reading Data

MapReduce uses a Java class InputFormat to read input data. Most frequently, these classes read data directly from HDFS. InputFormat implementations also exist to read data from HBase, Cassandra, and other data sources. The task of the InputFormat is twofold. First, it determines how data is split into sections so that it can be processed in parallel by MapReduce’s map tasks. Second, it provides a RecordReader, a class that MapReduce uses to read records from its input source and convert them to keys and values for the map task to operate on.

HCatalog provides HCatInputFormat to enable MapReduce users to read data stored in Hive’s data warehouse. It allows users to read only the partitions of tables and columns that they need. And it provides the records in a convenient list format so that users do not need to parse them.

Note

HCatInputFormat implements the Hadoop 0.20 API, org.apache.hadoop.mapreduce, not the Hadoop 0.18 org.apache.hadoop.mapred API. This is because it requires some features added in the MapReduce (0.20) API. This means that a MapReduce user will need to use this interface to interact with HCatalog. However, Hive requires that the underlying InputFormat used to read data from disk be a mapred implementation. So if you have data formats you are currently using with a MapReduce InputFormat, you can use it with HCatalog. InputFormat is a class in the mapreduce API and an interface in the mapred API, hence it was referred to as a class above.

When initializing HCatInputFormat, the first thing to do is specify the table to be read. This is done by creating an InputJobInfo class and specifying the database, table, and partition filter to use.

InputJobInfo.java

  /**
   * Initializes a new InputJobInfo
   * for reading data from a table.
   * @param databaseName the db name
   * @param tableName the table name
   * @param filter the partition filter
   */
  public static InputJobInfo create(String databaseName,
      String tableName,
      String filter) {
      ...
  }

databaseName name indicates the Hive database (or schema) the table is in. If this is null then the default database will be used. The tableName is the table that will be read. This must be non-null and refer to a valid table in Hive. filter indicates which partitions the user wishes to read. If it is left null then the entire table will be read. Care should be used here, as reading all the partitions of a large table can result in scanning a large volume of data.

Filters are specified as an SQL-like where clause. They should reference only partition columns of the data. For example, if the table to be read is partitioned on a column called datestamp, the filter might look like datestamp = "2012-05-26". Filters can contain =, >, >=, <, <=, and, and or as operators.

There is a bug in the ORM mapping layer used by Hive v0.9.0 and earlier that causes filter clauses with >, >=, <, or <= to fail.

Note

To resolve this bug, you can apply the patch HIVE-2084.D2397.1.patch from https://issues.apache.org/jira/browse/HIVE-2084 and rebuild your version of Hive. This does carry some risks, depending on how you deploy Hive. See the discussion on the JIRA entry.

This InputJobInfo instance is then passed to HCatInputFormat via the method setInput along with the instance of Job being used to configure the MapReduce job:

Job job = new Job(conf, "Example");
InputJobInfo inputInfo =  InputJobInfo.create(dbName, inputTableName, filter));
HCatInputFormat.setInput(job, inputInfo);

The map task will need to specify HCatRecord as a value type. The key type is not important, as HCatalog does not provide keys to the map task. For example, a map task that reads data via HCatalog might look like:

  public static class Map extends
      Mapper<WritableComparable, HCatRecord, Text, Text> {

    @Override
    protected void map(
      WritableComparable key,
      HCatRecord value,
      org.apache.hadoop.mapreduce.Mapper<WritableComparable,
          HCatRecord, Text, HCatRecord>.Context context) {
        ...
    }
  }

HCatRecord is the class that HCatalog provides for interacting with records. It presents a simple get and set interface. Records can be requested by position or by name. When requesting columns by name, the schema must be provided, as each individual HCatRecord does not keep a reference to the schema. The schema can be obtained by calling HCatInputFormat.getOutputSchema(). Since Java does not support overloading of functions by return type, different instances of get and set are provided for each data type. These methods use the object versions of types rather than scalar versions (that is java.lang.Integer rather than int). This allows them to express null as a value. There are also implementations of get and set that work with Java Objects:

// get the first column, as an Object and cast it to a Long
Long cnt = record.get(0);

// get the column named "cnt" as a Long
Long cnt = record.get("cnt", schema);

// set the column named "user" to the string "fred"
record.setString("user", schema, "fred");

Often a program will not want to read all of the columns in an input. In this case it makes sense to trim out the extra columns as quickly as possible. This is particularly true in columnar formats like RCFile, where trimming columns early means reading less data from disk. This can be achieved by passing a schema that describes the desired columns. This must be done during job configuration time. The following example will configure the user’s job to read only two columns named user and url:

HCatSchema baseSchema = HCatBaseInputFormat.getOutputSchema(context);
List<HCatFieldSchema> fields = new List<HCatFieldSchema>(2);
fields.add(baseSchema.get("user"));
fields.add(baseSchema.get("url"));
HCatBaseInputFormat.setOutputSchema(job, new HCatSchema(fields));

Writing Data

Similar to reading data, when writing data, the database and table to be written to need to be specified. If the data is being written to a partitioned table and only one partition is being written, then the partition to be written needs to be specified as well:

  /**
   * Initializes a new OutputJobInfo instance for writing data from a table.
   * @param databaseName the db name
   * @param tableName the table name
   * @param partitionValues The partition values to publish to, can be null or empty Map
   */
  public static OutputJobInfo create(String databaseName,
                                     String tableName,
                                     Map<String, String> partitionValues) {
      ...
  }

The databaseName name indicates the Hive database (or schema) the table is in. If this is null then the default database will be used. The tableName is the table that will be written to. This must be non-null and refer to a valid table in Hive. partitionValues indicates which partition the user wishes to create. If only one partition is to be written, the map must uniquely identify a partition. For example, if the table is partitioned by two columns, entries for both columns must be in the map. When working with tables that are not partitioned, this field can be left null. When the partition is explicitly specified in this manner, the partition column need not be present in the data. If it is, it will be removed by HCatalog before writing the data to the Hive warehouse, as Hive does not store partition columns with the data.

It is possible to write to more than one partition at a time. This is referred to as dynamic partitioning, because the records are partitioned dynamically at runtime. For dynamic partitioning to be used, the values of the partition column(s) must be present in the data. For example, if a table is partitioned by a column “datestamp,” that column must appear in the data collected in the reducer. This is because HCatalog will read the partition column(s) to determine which partition to write the data to. As part of writing the data, the partition column(s) will be removed.

Once an OutputJobInfo has been created, it is then passed to HCatOutputFormat via the static method setOutput:

OutputJobInfo outputInfo = OutputJobInfo.create(dbName, outputTableName, null));
HCatOutputFormat.setOutput(job, outputInfo);

When writing with HCatOutputFormat, the output key type is not important. The value must be HCatRecord. Records can be written from the reducer, or in map only jobs from the map task.

Putting all this together in an example, the following code will read a partition with a datestamp of 20120531 from the table rawevents, count the number of events for each user, and write the result to a table cntd:

public class MRExample extends Configured implements Tool {

  public static class Map extends
    Mapper<WritableComparable, HCatRecord, Text, LongWritable> {

    protected void map(WritableComparable key,
                       HCatRecord value,
                       Mapper<WritableComparable, HCatRecord,
                         Text, LongWritable>.Context context)
                       throws IOException, InterruptedException {
      // Get our schema from the Job object.
      HCatSchema schema = HCatBaseInputFormat.getOutputSchema(context);

      // Read the user field
      String user = value.get("user", schema);
      context.write(new Text(user), new LongWritable(1));
    }
  }

  public static class Reduce extends Reducer<Text, LongWritable,
      WritableComparable, HCatRecord> {

    protected void reduce(Text key, Iterable<LongWritable> values,
        Reducer<Text, LongWritable,
        WritableComparable, HCatRecord>.Context context)
        throws IOException ,InterruptedException {

      List<HCatFieldSchema> columns = new ArrayList<HCatFieldSchema>(2);
      columns.add(new HCatFieldSchema("user", HCatFieldSchema.Type.STRING, ""));
      columns.add(new HCatFieldSchema("cnt", HCatFieldSchema.Type.BIGINT, ""));
      HCatSchema schema = new HCatSchema(columns);

      long sum = 0;
      Iterator<IntWritable> iter = values.iterator();
      while (iter.hasNext()) sum += iter.next().getLong();
      HCatRecord output = new DefaultHCatRecord(2);
      record.set("user", schema, key.toString());
      record.setLong("cnt", schema, sum);
      context.write(null, record);
    }
  }

  public int run(String[] args) throws Exception {
    Job job = new Job(conf, "Example");
    // Read the "rawevents" table, partition "20120531", in the default
    // database
    HCatInputFormat.setInput(job, InputJobInfo.create(null, "rawevents",
        "datestamp='20120531'"));
    job.setInputFormatClass(HCatInputFormat.class);
    job.setJarByClass(MRExample.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    job.setOutputKeyClass(WritableComparable.class);
    job.setOutputValueClass(DefaultHCatRecord.class);
    // Write into "cntd" table, partition "20120531", in the default database
    HCatOutputFormat.setOutput(job
        OutputJobInfo.create(null, "cntd", "ds=20120531"));
    job.setOutputFormatClass(HCatOutputFormat.class);
    return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new MRExample(), args);
      System.exit(exitCode);
    }
}

Command Line

Since HCatalog utilizes Hive’s metastore, Hive users do not need an additional tool to interact with it. They can use the Hive command-line tool as before. However, for HCatalog users that are not also Hive users, a command-line tool hcat is provided. This tool is very similar to Hive’s command line. The biggest difference is that it only accepts commands that do not result in a MapReduce job being spawned. This means that the vast majority of DDL (Data Definition Language, or operations that define the data, such as creating tables) are supported:

$ /usr/bin/hcat -e "create table rawevents (user string, url string);"

The command line supports the following options:

OptionExplanationExample

-e

Execute DDL provided on the command line

hcat -e “show tables;”

-f

Execute DDL provided in a script file

hcat -f setup.sql

-g

See the security section below

-p

See the security section below

-D

Port for the Cassandra server

hcat -Dlog.level=INFO

-h

Port for the Cassandra server

hcat -h

The SQL operations that HCatalog’s command line does not support are:

  • SELECT

  • CREATE TABLE AS SELECT

  • INSERT

  • LOAD

  • ALTER INDEX REBUILD

  • ALTER TABLE CONCATENATE

  • ALTER TABLE ARCHIVE

  • ANALYZE TABLE

  • EXPORT TABLE

  • IMPORT TABLE

Security Model

HCatalog does not make use of Hive’s authorization model. However, user authentication in HCatalog is identical to Hive. Hive attempts to replicate traditional database authorization models. However, this has some limitations in the Hadoop ecosystem. Since it is possible to go directly to the filesystem and access the underlying data, authorization in Hive is limited. This can be resolved by having all files and directories that contain Hive’s data be owned by the user running Hive jobs. This way other users can be prevented from reading or writing data, except through Hive. However, this has the side effect that all UDFs in Hive will then run as a super user, since they will be running in the Hive process. Consequently, they will have read and write access to all files in the warehouse.

The only way around this in the short term is to declare UDFs to be a privileged operation and only allow those with proper access to create UDFs, though there is no mechanism to enforce this currently. This may be acceptable in the Hive context, but in Pig and MapReduce where user-generated code is the rule rather than the exception, this is clearly not acceptable.

To resolve these issues, HCatalog instead delegates authorization to the storage layer. In the case of data stored in HDFS, this means that HCatalog looks at the directories and files containing data to see if a user has access to the data. If so, he will be given identical access to the metadata. For example, if a user has permission to write to a directory that contains a table’s partitions, she will also have permission to write to that table.

This has the advantage that it is truly secure. It is not possible to subvert the system by changing abstraction levels. The disadvantage is that the security model supported by HDFS is much poorer than is traditional for databases. In particular, features such as column-level permissions are not possible. Also, users can only be given permission to a table by being added to a filesystem group that owns that file.

Architecture

As explained above, HCatalog presents itself to MapReduce and Pig using their standard input and output mechanisms. HCatLoader and HCatStorer are fairly simple since they sit atop HCatInputFormat and HCatOutputFormat, respectively. These two MapReduce classes do a fair amount of work to integrate MapReduce with Hive’s metastore.

Figure 22-1 shows the HCatalog architecture.

HCatalog architecture diagram

Figure 22-1. HCatalog architecture diagram

HCatInputFormat communicates with Hive’s metastore to obtain information about the table and partition(s) to be read. This includes finding the table schema as well as schema for each partition. For each partition it must also determine the actual InputFormat and SerDe to use to read the partition. When HCatInputFormat.getSplits is called, it instantiates an instance of the InputFormat for each partition and calls getSplits on that InputFormat. These are then collected together and the splits from all the partitions returned as the list of InputSplits.

Similarly, the RecordReaders from each underlying InputFormat are used to decode the partitions. The HCatRecordReader then converts the values from the underlying RecordReader to HCatRecords via the SerDe associated with the partition. This includes padding each partition with any missing columns. That is, when the table schema contains columns that the partition schema does not, columns with null values must be added to the HCatRecord. Also, if the user has indicated that only certain columns are needed, then the extra columns are trimmed out at this point.

HCatOutputFormat also communicates with the Hive metastore to determine the proper file format and schema for writing. Since HCatalog only supports writing data in the format currently specified for the table, there is no need to open different OutputFormats per partition. The underlying OutputFormat is wrapped by HCatOutputFormat. A RecordWriter is then created per partition that wraps the underlying RecordWriter, while the indicated SerDe is used to write data into these new records. When all of the partitions have been written, HCatalog uses an OutputCommitter to commit the data to the metastore.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset