Storage Handlers are a combination of InputFormat
, OutputFormat
, SerDe
, and specific code that Hive uses to treat
an external entity as a standard Hive table. This allows the user to issue
queries seamlessly whether the table represents a text file stored in Hadoop
or a column family stored in a NoSQL database such as Apache
HBase, Apache Cassandra, and
Amazon DynamoDB. Storage handlers are not only limited
to NoSQL databases, a storage handler could be designed for many different
kinds of data stores.
A specific storage handler may only implement some of the capabilities. For example, a given storage handler may allow read-only access or impose some other restriction.
Storage handlers offer a streamlined system for ETL. For example, a Hive query could be run that selects a data table that is backed by sequence files, however it could output
Hadoop has an abstraction known as InputFormat
that allows data from different
sources and formats to be used as input for a job. The TextInputFormat
is a concrete implementation of
InputFormat
. It works by providing
Hadoop with information on how to split a given path into multiple tasks,
and it provides a RecordReader
that
provides methods for reading data from each split.
Hadoop also has an abstraction known as OutputFormat
, which takes the output from a job
and outputs it to an entity. The TextOutputFormat
is a concrete implementation of
OutputFormat
. It works by persisting
output to a file which could be stored on HDFS or locally.
Input and output that represent physical files are common in Hadoop,
however InputFormat
and OutputFormat
abstractions can be used to load
and persist data from other sources including relational databases, NoSQL
stores like Cassandra or HBase, or anything that InputFormat
or OutputFormat
can be designed around!
In the HiveQL chapter, we demonstrated the Word Count example written in Java Code, and then demonstrated an equivalent solution written in Hive. Hive’s abstractions such as tables, types, row format, and other metadata are used by Hive to understand the source data. Once Hive understands the source data, the query engine can process the data using familiar HiveQL operators.
Many NoSQL databases have implemented Hive connectors using custom adapters.
HiveStorageHandler
is the
primary interface Hive uses to connect with NoSQL stores such as HBase,
Cassandra, and others. An examination of the interface shows that a custom
InputFormat
, OutputFormat
, and SerDe
must be defined. The storage handler
enables both reading from and writing to the underlying storage subsystem.
This translates into writing SELECT
queries against the data system, as well as writing into the data system
for actions such as reports.
When executing Hive queries over NoSQL databases, the performance is less than normal Hive and MapReduce jobs on HDFS due to the overhead of the NoSQL system. Some of the reasons include the socket connection to the server and the merging of multiple underlying files, whereas typical access from HDFS is completely sequential I/O. Sequential I/O is very fast on modern hard drives.
A common technique for combining NoSQL databases with Hadoop in an overall system architecture is to use the NoSQL database cluster for real-time work, and utilize the Hadoop cluster for batch-oriented work. If the NoSQL system is the master data store, and that data needs to be queried on using batch jobs with Hadoop, bulk exporting is an efficient way to convert the NoSQL data into HDFS files. Once the HDFS files are created via an export, batch Hadoop jobs may be executed with a maximum efficiency.
The following creates a Hive table and an HBase table using HiveQL:
CREATE
TABLE
hbase_stocks
(
key
INT
,
name
STRING
,
price
FLOAT
)
STORED
BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH
SERDEPROPERTIES
(
"hbase.columns.mapping"
=
":key,stock:val"
)
TBLPROPERTIES
(
"hbase.table.name"
=
"stocks"
);
To create a Hive table that points to an existing HBase table, the
CREATE EXTERNAL TABLE
HiveQL statement
must be used:
CREATE
EXTERNAL
TABLE
hbase_stocks
(
key
INT
,
name
STRING
,
price
FLOAT
)
STORED
BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH
SERDEPROPERTIES
(
"hbase.columns.mapping"
=
"cf1:val"
)
TBLPROPERTIES
(
"hbase.table.name"
=
"stocks"
);
Instead of scanning the entire HBase table for a given Hive query, filter pushdowns will constrain the row data returned to Hive.
Examples of the types of predicates that are converted into pushdowns are:
key < 20
key = 20
key < 20 and key > 10
Any other more complex types of predicates will be ignored and not utilize the pushdown feature.
The following is an example of creating a simple table and a query that will use the filter pushdown feature. Note the pushdown is always on the HBase key, and not the column values of a column family:
CREATE
TABLE
hbase_pushdown
(
key
int
,
value
string
)
STORED
BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH
SERDEPROPERTIES
(
"hbase.columns.mapping"
=
":key,cf:string"
);
SELECT
*
FROM
hbase_pushdown
WHERE
key
=
90
;
The following query will not result in a pushdown because it
contains an OR
on the predicate:
SELECT
*
FROM
hbase_pushdown
WHERE
key
<=
'80'
OR
key
>=
'100'
;
Hive with HBase supports joins on HBase tables to HBase tables, and HBase tables to non-HBase tables.
By default, pushdowns are turned on, however they may be turned off with the following:
set
hive
.
optimize
.
ppd
.
storage
=
false
;
It is important to note when inserting data into HBase from Hive that HBase requires unique keys, whereas Hive has no such constraint.
A few notes on column mapping Hive for HBase:
There is no way to access the HBase row timestamp, and only the latest version of a row is returned
The HBase key must be defined explicitly
Cassandra has implemented the HiveStorageHandler
interface in a similar way to
that of HBase. The implementation was originally performed by Datastax on
the Brisk project.
The model is fairly straightforward, a Cassandra column family maps to a Hive table. In turn, Cassandra column names map directly to Hive column names.
Static column mapping is useful when the user has specific columns inside Cassandra which they wish to map to Hive columns. The following is an example of creating an external Hive table that maps to an existing Cassandra keyspace and column family:
CREATE
EXTERNAL
TABLE
Weblog
(
useragent
string
,
ipaddress
string
,
timestamp
string
)
STORED
BY
'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH
SERDEPROPERTIES
(
"cassandra.columns.mapping"
=
":key,user_agent,ip_address,time_stamp"
)
TBLPROPERTIES
(
"cassandra.range.size"
=
"200"
,
"cassandra.slice.predicate.size"
=
"150"
);
Some use cases of Cassandra use dynamic columns. This use case is where a given column family does not have fixed, named columns, but rather the columns of a row key represent some piece of data. This is often used in time series data where the column name represents a time and the column value represents the value at that time. This is also useful if the column names are not known or you wish to retrieve all of them:
CREATE
EXTERNAL
TABLE
Weblog
(
useragent
string
,
ipaddress
string
,
timestamp
string
)
STORED
BY
'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH
SERDEPROPERTIES
(
"cassandra.columns.mapping"
=
":key,:column,:value"
);
The following properties in Table 17-1 can be declared in a WITH SERDEPROPERTIES
clause:
Table 17-1. Cassandra SerDe storage handler properties
Name | Description |
---|---|
| Mapping of Hive to Cassandra columns |
| Column family name in Cassandra |
| IP of a Cassandra node to connect to |
| Cassandra RPC port: default 9160 |
| Partitioner: default
|
The following properties in Table 17-2 can be declared in a TBLPROPERTIES
clause:
Table 17-2. Cassandra table properties
Name | Description |
---|---|
| Cassandra keyspace name |
| Cassandra replication factor: default 1 |
| Replication strategy: default SimpleStrategy |
| MapReduce split size: default 64 * 1024 |
| MapReduce range batch size: default 1000 |
| MapReduce slice predicate size: default 1000 |
Amazon’s Dynamo was one of the first NoSQL databases. Its design influenced many other databases, including Cassandra and HBase. Despite its influence, Dynamo was restricted to internal use by Amazon until recently. Amazon released another database influenced by the original Dynamo called DynamoDB.
DynamoDB is in the family of key-value databases. In DynamoDB, tables are a collection of items and they are required to have a primary key. An item consists of a key and an arbitrary number of attributes. The set of attributes can vary from item to item.
You can query a table with Hive and you can move data to and from S3. Here is another example of a Hive table for stocks that is backed by a DynamoDB table:
CREATE
EXTERNAL
TABLE
dynamo_stocks
(
key
INT
,
symbol
STRING
,
ymd
STRING
,
price
FLOAT
)
STORED
BY
'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES
(
"dynamodb.table.name"
=
"Stocks"
,
"dynamodb.column.mapping"
=
"key:Key,symbol:Symbol,
ymd:YMD,price_close:Close"
);
See http://aws.amazon.com/dynamodb/ for more information about DynamoDB.