—Alan Gates
Using Hive for data processing on Hadoop has several nice features beyond the ability to use an SQL-like language. It’s ability to store metadata means that users do not need to remember the schema of the data. It also means they do not need to know where the data is stored, or what format it is stored in. This decouples data producers, data consumers, and data administrators. Data producers can add a new column to the data without breaking their consumers’ data-reading applications. Administrators can relocate data to change the format it is stored in without requiring changes on the part of the producers or consumers.
The majority of heavy Hadoop users do not use a single tool for data production and consumption. Often, users will begin with a single tool: Hive, Pig, MapReduce, or another tool. As their use of Hadoop deepens they will discover that the tool they chose is not optimal for the new tasks they are taking on. Users who start with analytics queries with Hive discover they would like to use Pig for ETL processing or constructing their data models. Users who start with Pig discover they would like to use Hive for analytics type queries.
While tools such as Pig and MapReduce do not require metadata, they can benefit from it when it is present. Sharing a metadata store also enables users across tools to share data more easily. A workflow where data is loaded and normalized using MapReduce or Pig and then analyzed via Hive is very common. When all these tools share one metastore, users of each tool have immediate access to data created with another tool. No loading or transfer steps are required.
HCatalog exists to fulfill these requirements. It makes the Hive metastore available to users of other tools on Hadoop. It provides connectors for MapReduce and Pig so that users of those tools can read data from and write data to Hive’s warehouse. It has a command-line tool for users who do not use Hive to operate on the metastore with Hive DDL statements. It also provides a notification service so that workflow tools, such as Oozie, can be notified when new data becomes available in the warehouse.
HCatalog is a separate Apache project from Hive, and is part of the Apache Incubator. The Incubator is where most Apache projects start. It helps those involved with the project build a community around the project and learn the way Apache software is developed. As of this writing, the most recent version is HCatalog 0.4.0-incubating. This version works with Hive 0.9, Hadoop 1.0, and Pig 0.9.2.
MapReduce uses a Java class InputFormat
to read input data. Most frequently, these classes read data directly
from HDFS. InputFormat
implementations also exist to read data from HBase, Cassandra, and other
data sources. The task of the InputFormat
is twofold. First, it determines
how data is split into sections so that it can be processed in parallel
by MapReduce’s map tasks. Second, it provides a RecordReader
, a class that MapReduce uses to
read records from its input source and convert them to keys and values
for the map task to operate on.
HCatalog provides HCatInputFormat
to enable MapReduce users to
read data stored in Hive’s data warehouse. It allows users to read only
the partitions of tables and columns that they need. And it provides the
records in a convenient list format so that users do not need to parse
them.
HCatInputFormat
implements
the Hadoop 0.20 API,
org.apache
.
hadoop
.mapreduce
, not the Hadoop
0.18 org.
apache.
hadoop.mapred
API. This is because it
requires some features added in the MapReduce (0.20) API. This means
that a MapReduce user will need to use this interface to interact with
HCatalog. However, Hive requires that the underlying InputFormat
used to read data from disk be a
mapred
implementation. So if you have data formats
you are currently using with a MapReduce InputFormat
, you can use it with HCatalog.
InputFormat
is a class in the
mapreduce API and an interface in the mapred
API,
hence it was referred to as a class above.
When initializing HCatInputFormat
, the first thing to do is
specify the table to be read. This is done by creating an InputJobInfo
class and specifying the
database, table, and partition filter to use.
InputJobInfo.java
/**
* Initializes a new InputJobInfo
* for reading data from a table.
* @param databaseName the db name
* @param tableName the table name
* @param filter the partition filter
*/
public
static
InputJobInfo
create
(
String
databaseName
,
String
tableName
,
String
filter
)
{
...
}
databaseName
name indicates the
Hive database (or schema) the table is in. If this is null then the
default database will be used. The tableName
is the table that will be read. This
must be non-null and refer to a valid table in Hive. filter
indicates which partitions the user
wishes to read. If it is left null then the entire table will be read.
Care should be used here, as reading all the partitions of a large table
can result in scanning a large volume of data.
Filters are specified as an SQL-like where
clause. They should reference only partition columns of the data. For
example, if the table to be read is partitioned on a column called
datestamp
, the filter might look like datestamp = "2012-05-26"
. Filters can contain
=
, >
, >=
, <
, <=
, and
,
and or
as operators.
There is a bug in the ORM mapping layer used by Hive v0.9.0 and
earlier that causes filter clauses with >
, >=
, <
, or <=
to fail.
To resolve this bug, you can apply the patch HIVE-2084.D2397.1.patch from https://issues.apache.org/jira/browse/HIVE-2084 and rebuild your version of Hive. This does carry some risks, depending on how you deploy Hive. See the discussion on the JIRA entry.
This InputJobInfo
instance is
then passed to HCatInputFormat
via
the method setInput
along with the
instance of Job
being used to
configure the MapReduce job:
Job
job
=
new
Job
(
conf
,
"Example"
);
InputJobInfo
inputInfo
=
InputJobInfo
.
create
(
dbName
,
inputTableName
,
filter
));
HCatInputFormat
.
setInput
(
job
,
inputInfo
);
The map task will need to specify HCatRecord
as a value type. The key type is
not important, as HCatalog does not provide keys to the map task. For
example, a map task that reads data via HCatalog might look like:
public
static
class
Map
extends
Mapper
<
WritableComparable
,
HCatRecord
,
Text
,
Text
>
{
@Override
protected
void
map
(
WritableComparable
key
,
HCatRecord
value
,
org
.
apache
.
hadoop
.
mapreduce
.
Mapper
<
WritableComparable
,
HCatRecord
,
Text
,
HCatRecord
>.
Context
context
)
{
...
}
}
HCatRecord
is the class that
HCatalog provides for interacting with records. It presents a simple
get
and set
interface. Records can
be requested by position or by name. When requesting columns by name,
the schema must be provided, as each individual HCatRecord
does not keep a reference to the
schema. The schema can be obtained by calling HCatInputFormat.getOutputSchema()
. Since Java
does not support overloading of functions by return type, different
instances of get
and set
are
provided for each data type. These methods use the object versions of
types rather than scalar versions (that is java.lang.Integer
rather than int
). This allows them to express null as a
value. There are also implementations of get
and
set
that work with Java Objects:
// get the first column, as an Object and cast it to a Long
Long
cnt
=
record
.
get
(
0
);
// get the column named "cnt" as a Long
Long
cnt
=
record
.
get
(
"cnt"
,
schema
);
// set the column named "user" to the string "fred"
record
.
setString
(
"user"
,
schema
,
"fred"
);
Often a program will not want to read all of the columns in an input. In
this case it makes sense to trim out the extra columns as quickly as
possible. This is particularly true in columnar formats like RCFile
, where trimming columns early means
reading less data from disk. This can be achieved by passing a schema
that describes the desired columns. This must be done during job
configuration time. The following example will configure the user’s job
to read only two columns named user
and
url
:
HCatSchema
baseSchema
=
HCatBaseInputFormat
.
getOutputSchema
(
context
);
List
<
HCatFieldSchema
>
fields
=
new
List
<
HCatFieldSchema
>(
2
);
fields
.
add
(
baseSchema
.
get
(
"user"
));
fields
.
add
(
baseSchema
.
get
(
"url"
));
HCatBaseInputFormat
.
setOutputSchema
(
job
,
new
HCatSchema
(
fields
));
Similar to reading data, when writing data, the database and table to be written to need to be specified. If the data is being written to a partitioned table and only one partition is being written, then the partition to be written needs to be specified as well:
/**
* Initializes a new OutputJobInfo instance for writing data from a table.
* @param databaseName the db name
* @param tableName the table name
* @param partitionValues The partition values to publish to, can be null or empty Map
*/
public
static
OutputJobInfo
create
(
String
databaseName
,
String
tableName
,
Map
<
String
,
String
>
partitionValues
)
{
...
}
The databaseName
name indicates
the Hive database (or schema) the table is in. If this is null then the
default database will be used. The tableName
is the table that will be written
to. This must be non-null and refer to a valid table in Hive. partitionValues
indicates which partition the
user wishes to create. If only one partition is to be written, the map
must uniquely identify a partition. For example, if the table is
partitioned by two columns, entries for both columns must be in the map.
When working with tables that are not partitioned, this field can be
left null. When the partition is explicitly specified in this manner,
the partition column need not be present in the data. If it is, it will
be removed by HCatalog before writing the data to the Hive warehouse, as
Hive does not store partition columns with the data.
It is possible to write to more than one partition at a time. This is referred to as dynamic partitioning, because the records are partitioned dynamically at runtime. For dynamic partitioning to be used, the values of the partition column(s) must be present in the data. For example, if a table is partitioned by a column “datestamp,” that column must appear in the data collected in the reducer. This is because HCatalog will read the partition column(s) to determine which partition to write the data to. As part of writing the data, the partition column(s) will be removed.
Once an OutputJobInfo
has been
created, it is then passed to HCatOutputFormat
via the static method
setOutput
:
OutputJobInfo
outputInfo
=
OutputJobInfo
.
create
(
dbName
,
outputTableName
,
null
));
HCatOutputFormat
.
setOutput
(
job
,
outputInfo
);
When writing with HCatOutputFormat
, the output key type is not
important. The value must be HCatRecord
. Records can be written from the
reducer, or in map only jobs from the map task.
Putting all this together in an example, the following code will
read a partition with a datestamp of 20120531 from
the table rawevents
, count the number of events for
each user, and write the result to a table
cntd
:
public
class
MRExample
extends
Configured
implements
Tool
{
public
static
class
Map
extends
Mapper
<
WritableComparable
,
HCatRecord
,
Text
,
LongWritable
>
{
protected
void
map
(
WritableComparable
key
,
HCatRecord
value
,
Mapper
<
WritableComparable
,
HCatRecord
,
Text
,
LongWritable
>.
Context
context
)
throws
IOException
,
InterruptedException
{
// Get our schema from the Job object.
HCatSchema
schema
=
HCatBaseInputFormat
.
getOutputSchema
(
context
);
// Read the user field
String
user
=
value
.
get
(
"user"
,
schema
);
context
.
write
(
new
Text
(
user
),
new
LongWritable
(
1
));
}
}
public
static
class
Reduce
extends
Reducer
<
Text
,
LongWritable
,
WritableComparable
,
HCatRecord
>
{
protected
void
reduce
(
Text
key
,
Iterable
<
LongWritable
>
values
,
Reducer
<
Text
,
LongWritable
,
WritableComparable
,
HCatRecord
>.
Context
context
)
throws
IOException
,
InterruptedException
{
List
<
HCatFieldSchema
>
columns
=
new
ArrayList
<
HCatFieldSchema
>(
2
);
columns
.
add
(
new
HCatFieldSchema
(
"user"
,
HCatFieldSchema
.
Type
.
STRING
,
""
));
columns
.
add
(
new
HCatFieldSchema
(
"cnt"
,
HCatFieldSchema
.
Type
.
BIGINT
,
""
));
HCatSchema
schema
=
new
HCatSchema
(
columns
);
long
sum
=
0
;
Iterator
<
IntWritable
>
iter
=
values
.
iterator
();
while
(
iter
.
hasNext
())
sum
+=
iter
.
next
().
getLong
();
HCatRecord
output
=
new
DefaultHCatRecord
(
2
);
record
.
set
(
"user"
,
schema
,
key
.
toString
());
record
.
setLong
(
"cnt"
,
schema
,
sum
);
context
.
write
(
null
,
record
);
}
}
public
int
run
(
String
[]
args
)
throws
Exception
{
Job
job
=
new
Job
(
conf
,
"Example"
);
// Read the "rawevents" table, partition "20120531", in the default
// database
HCatInputFormat
.
setInput
(
job
,
InputJobInfo
.
create
(
null
,
"rawevents"
,
"datestamp='20120531'"
));
job
.
setInputFormatClass
(
HCatInputFormat
.
class
);
job
.
setJarByClass
(
MRExample
.
class
);
job
.
setMapperClass
(
Map
.
class
);
job
.
setReducerClass
(
Reduce
.
class
);
job
.
setMapOutputKeyClass
(
Text
.
class
);
job
.
setMapOutputValueClass
(
LongWritable
.
class
);
job
.
setOutputKeyClass
(
WritableComparable
.
class
);
job
.
setOutputValueClass
(
DefaultHCatRecord
.
class
);
// Write into "cntd" table, partition "20120531", in the default database
HCatOutputFormat
.
setOutput
(
job
OutputJobInfo
.
create
(
null
,
"cntd"
,
"ds=20120531"
));
job
.
setOutputFormatClass
(
HCatOutputFormat
.
class
);
return
(
job
.
waitForCompletion
(
true
)
?
0
:
1
);
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
int
exitCode
=
ToolRunner
.
run
(
new
MRExample
(),
args
);
System
.
exit
(
exitCode
);
}
}
Since HCatalog utilizes Hive’s metastore, Hive users do not need an
additional tool to interact with it. They can use the Hive command-line
tool as before. However, for HCatalog users that are not also Hive users,
a command-line tool hcat
is provided.
This tool is very similar to Hive’s command line. The biggest difference
is that it only accepts commands that do not result in a MapReduce job
being spawned. This means that the vast majority of DDL (Data Definition
Language, or operations that define the data, such as creating tables) are
supported:
$
/usr/bin/hcat -e"create table rawevents (user string, url string);"
The command line supports the following options:
Option | Explanation | Example |
---|---|---|
| Execute DDL provided on the command line |
|
| Execute DDL provided in a script file |
|
| See the security section below | |
| See the security section below | |
| Port for the Cassandra server |
|
| Port for the Cassandra server |
|
The SQL operations that HCatalog’s command line does not support are:
SELECT
CREATE TABLE AS SELECT
INSERT
LOAD
ALTER INDEX REBUILD
ALTER TABLE
CONCATENATE
ALTER TABLE ARCHIVE
ANALYZE TABLE
EXPORT TABLE
IMPORT TABLE
HCatalog does not make use of Hive’s authorization model. However, user authentication in HCatalog is identical to Hive. Hive attempts to replicate traditional database authorization models. However, this has some limitations in the Hadoop ecosystem. Since it is possible to go directly to the filesystem and access the underlying data, authorization in Hive is limited. This can be resolved by having all files and directories that contain Hive’s data be owned by the user running Hive jobs. This way other users can be prevented from reading or writing data, except through Hive. However, this has the side effect that all UDFs in Hive will then run as a super user, since they will be running in the Hive process. Consequently, they will have read and write access to all files in the warehouse.
The only way around this in the short term is to declare UDFs to be a privileged operation and only allow those with proper access to create UDFs, though there is no mechanism to enforce this currently. This may be acceptable in the Hive context, but in Pig and MapReduce where user-generated code is the rule rather than the exception, this is clearly not acceptable.
To resolve these issues, HCatalog instead delegates authorization to the storage layer. In the case of data stored in HDFS, this means that HCatalog looks at the directories and files containing data to see if a user has access to the data. If so, he will be given identical access to the metadata. For example, if a user has permission to write to a directory that contains a table’s partitions, she will also have permission to write to that table.
This has the advantage that it is truly secure. It is not possible to subvert the system by changing abstraction levels. The disadvantage is that the security model supported by HDFS is much poorer than is traditional for databases. In particular, features such as column-level permissions are not possible. Also, users can only be given permission to a table by being added to a filesystem group that owns that file.
As explained above, HCatalog presents itself to MapReduce and Pig using their
standard input and output mechanisms. HCatLoader
and HCatStorer
are fairly simple since they sit atop
HCatInputFormat
and HCatOutputFormat
, respectively. These two
MapReduce classes do a fair amount
of work to integrate MapReduce with Hive’s metastore.
Figure 22-1 shows the HCatalog architecture.
HCatInputFormat
communicates with
Hive’s metastore to obtain information about the table and partition(s) to
be read. This includes finding the table schema as well as schema for each
partition. For each partition it must also determine the actual InputFormat
and SerDe
to use to read the partition. When
HCatInputFormat.getSplits
is called, it
instantiates an instance of the InputFormat
for each partition and calls
getSplits
on that InputFormat
. These are then collected together
and the splits from all the partitions returned as the list of InputSplits
.
Similarly, the RecordReaders
from
each underlying InputFormat
are used to decode the
partitions. The HCatRecordReader
then
converts the values from the underlying RecordReader
to HCatRecords
via the SerDe
associated with the partition. This
includes padding each partition with any missing columns. That is, when
the table schema contains columns that the partition schema does not,
columns with null values must be added to the HCatRecord
. Also, if the user has indicated that
only certain columns are needed, then the extra columns are trimmed out at
this point.
HCatOutputFormat
also
communicates with the Hive metastore to determine the proper file format
and schema for writing. Since HCatalog only supports writing data in the
format currently specified for the table, there is no need to open
different OutputFormats
per partition.
The underlying OutputFormat
is wrapped
by HCatOutputFormat
. A RecordWriter
is then created per partition that
wraps the underlying RecordWriter
,
while the indicated SerDe
is used to
write data into these new records. When all of the partitions have been
written, HCatalog uses an OutputCommitter
to commit the data to the
metastore.