One of Hive’s unique features is that Hive does not force data
to be converted to a specific format. Hive leverages Hadoop’s InputFormat
APIs to read data from a variety of
sources, such as text files, sequence files, or even custom formats.
Likewise, the OutputFormat
API is used to
write data to various formats.
While Hadoop offers linear scalability in file storage for uncompressed data, storing data in compressed form has many benefits. Compression typically saves significant disk storage; for example, text-based files may compress 40% or more. Compression also can increase throughput and performance. This may seem counterintuitive because compressing and decompressing data incurs extra CPU overhead, however, the I/O savings resulting from moving fewer bytes into memory can result in a net performance gain.
Hadoop jobs tend to be I/O bound, rather than CPU bound. If so, compression will improve performance. However, if your jobs are CPU bound, then compression will probably lower your performance. The only way to really know is to experiment with different options and measure the results.
Based on your Hadoop version, different codecs will be
available to you. The set
feature in
Hive can be used to display the value of hiveconf
or Hadoop configuration values. The
codecs available are in a comma-separated list named io.compression.codec
:
#
hive
-
e
"set io.compression.codecs"
io
.
compression
.
codecs
=
org
.
apache
.
hadoop
.
io
.
compress
.
GzipCodec
,
org
.
apache
.
hadoop
.
io
.
compress
.
DefaultCodec
,
org
.
apache
.
hadoop
.
io
.
compress
.
BZip2Codec
,
org
.
apache
.
hadoop
.
io
.
compress
.
SnappyCodec
Using compression has the advantage of minimizing the disk space required for files and the overhead of disk and network I/O. However, compressing and decompressing files increases the CPU overhead. Therefore, compression is best used for I/O-bound jobs, where there is extra CPU capacity, or when disk space is at a premium.
All recent versions of Hadoop have built-in support for the GZip and BZip2 compression schemes, including native Linux libraries that accelerate compression and decompression for these formats. Bundled support for Snappy compression was recently added, but if your version of Hadoop doesn’t support it, you can add the appropriate libraries yourself.[18] Finally, LZO compression is often used.[19]
So, why do we need different compression schemes? Each scheme makes a trade-off between speed and minimizing the size of the compressed output. BZip2 creates the smallest compressed output, but with the highest CPU overhead. GZip is next in terms of compressed size versus speed. Hence, if disk space utilization and I/O overhead are concerns, both are attractive choices.
LZO and Snappy create larger files but are much faster, especially for decompression. They are good choices if disk space and I/O overhead are less important than rapid decompression of frequently read data.
Another important consideration is whether or not the compression format is splittable. MapReduce wants to split very large input files into splits (often one split per filesystem block, i.e., a multiple of 64 MB), where each split is sent to a separate map process. This can only work if Hadoop knows the record boundaries in the file. In text files, each line is a record, but these boundaries are obscured by GZip and Snappy. However, BZip2 and LZO provide block-level compression, where each block has complete records, so Hadoop can split these files on block boundaries.
The desire for splittable files doesn’t rule out GZip and Snappy. When you create your data files, you could partition them so that they are approximately the desired size. Typically the number of output files is equal to the number of reducers. If you are using N reducers you typically get N output files. Be careful, if you have a large nonsplittable file, a single task will have to read the entire file beginning to end.
There’s much more we could say about compression, but instead we’ll refer you to Hadoop: The Definitive Guide by Tom White (O’Reilly) for more details, and we’ll focus now on how to tell Hive what format you’re using.
From Hive’s point of view, there are two aspects to the file format.
One aspect is how the file is delimited into rows (records). Text files
use
(linefeed) as the default row
delimiter. When you aren’t using the default text file format, you tell
Hive the name of an InputFormat
and an
OutputFormat
to use. Actually, you will
specify the names of Java classes that implement these formats. The
InputFormat
knows how to read splits
and partition them into records, and the OutputFormat
knows how to write these splits
back to files or console output.
The second aspect is how records are partitioned into fields (or
columns). Hive uses ^A
by default to
separate fields in text files. Hive uses the name
SerDe, which is short for
serializer/deserializer for the “module” that
partitions incoming records (the deserializer) and also knows how to write
records in this format (the serializer). This time you will specify a
single Java class that performs both jobs.
All this information is specified as part of the table definition when you create the table. After creation, you query the table as you normally would, agnostic to the underlying format. Hence, if you’re a user of Hive, but not a Java developer, don’t worry about the Java aspects. The developers on your team will help you specify this information when needed, after which you’ll work as you normally do.
Intermediate compression shrinks the data shuffled between
the map and reduce tasks for a job. For intermediate compression, choosing
a codec that has lower CPU cost is typically more important than choosing
a codec that results in the most compression. The property hive.exec.compress.intermediate
defaults to
false
and should be set to true
by
default:
<property>
<name>
hive.exec.compress.intermediate</name>
<value>
true</value>
<description>
This controls whether intermediate files produced by Hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress*</description>
</property>
The property that controls intermediate compression for other
Hadoop jobs is mapred.compress.map.output
.
Hadoop compression has a DefaultCodec
. Changing the codec involves
setting the mapred.map.output.compression.codec
property.
This is a Hadoop variable and can be set in the
$HADOOP_HOME/conf/mapred-site.xml or the
$HADOOP_HOME/conf/hive-site.xml. SnappyCodec
is a good choice for intermediate
compression because it combines good compression performance with low CPU
cost:
<property>
<name>
mapred.map.output.compression.codec</name>
<value>
org.apache.hadoop.io.compress.SnappyCodec</value>
<description>
This controls whether intermediate files produced by Hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress*</description>
</property>
When Hive writes output to a table, that content can also be
compressed. The property hive.exec.compress.output
controls this feature.
You may wish to leave this value set to false
in the global configuration file, so that
the default output is uncompressed clear text. Users can turn on final
compression by setting the property to true on a query-by-query basis or
in their scripts:
<property>
<name>
hive.exec.compress.output</name>
<value>
false</value>
<description>
This controls whether the final outputs of a query (to a local/hdfs file or a Hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress*</description>
</property>
The property that controls final compression for other Hadoop jobs
is mapred.output.compress
.
If hive.exec.compress.output
is set
true
, a codec can be chosen. GZip compression is a good
choice for output compression because it typically reduces the size of
files significantly, but remember that GZipped files aren’t splittable by
subsequent MapReduce jobs:
<property>
<name>
mapred.output.compression.codec</name>
<value>
org.apache.hadoop.io.compress.GzipCodec</value>
<description>
If the job outputs are compressed, how should they be compressed?</description>
</property>
Compressing files results in space savings but one of the downsides of storing raw compressed files in Hadoop is that often these files are not splittable. Splittable files can be broken up and processed in parts by multiple mappers in parallel. Most compressed files are not splittable because you can only start reading from the beginning.
The sequence file format supported by Hadoop breaks a file into blocks and then optionally compresses the blocks in a splittable way.
To use sequence files from Hive, add the STORED AS SEQUENCEFILE
clause to a CREATE TABLE
statement:
CREATE
TABLE
a_sequence_file_table
STORED
AS
SEQUENCEFILE
;
Sequence files have three different compression options: NONE
, RECORD
,
and BLOCK
. RECORD
is the default. However, BLOCK
compression is usually more efficient and
it still provides the desired splittability. Like many other compression
properties, this one is not Hive-specific. It can be defined in Hadoop’s
mapred-site.xml file, in Hive’s
hive-site.xml, or as needed in scripts or before
individual queries:
<property>
<name>
mapred.output.compression.type</name>
<value>
BLOCK</value>
<description>
If the job outputs are to compressed as SequenceFiles, how should they be compressed? Should be one of NONE, RECORD or BLOCK.</description>
</property>
We have introduced a number of compression-related properties in Hive, and different permutations of these options result in different output. Let’s use these properties in some examples and show what they produce. Remember that variables set by the CLI persist across the rest of the queries in the session, so between examples you should revert the settings or simply restart the Hive session:
hive
>
SELECT
*
FROM
a
;
4
5
3
2
hive
>
DESCRIBE
a
;
a
int
b
int
First, let’s enable intermediate compression. This won’t affect the final output, however the job counters will show less physical data transferred for the job, since the shuffle sort data was compressed:
hive
>
set
hive
.
exec
.
compress
.
intermediate
=
true
;
hive
>
CREATE
TABLE
intermediate_comp_on
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
' '
>
AS
SELECT
*
FROM
a
;
Moving
data
to
:
file
:
/
user
/
hive
/
warehouse
/
intermediate_comp_on
Table
default
.
intermediate_comp_on
stats
:
[
num_partitions
:
0
,
num_files
:
1
,
num_rows
:
2
,
total_size
:
8
,
raw_data_size
:
6
]
...
As expected, intermediate compression did not affect the final output, which remains uncompressed:
hive
>
dfs
-
ls
/
user
/
hive
/
warehouse
/
intermediate_comp_on
;
Found
1
items
/
user
/
hive
/
warehouse
/
intermediate_comp_on
/
000000
_0
hive
>
dfs
-
cat
/
user
/
hive
/
warehouse
/
intermediate_comp_on
/
000000
_0
;
4
5
3
2
We can also chose an intermediate compression codec other then the default codec. In this case we chose GZIP, although Snappy is normally a better option. The first line is wrapped for space:
hive
>
set
mapred
.
map
.
output
.
compression
.
codec
=
org
.
apache
.
hadoop
.
io
.
compress
.
GZipCodec
;
hive
>
set
hive
.
exec
.
compress
.
intermediate
=
true
;
hive
>
CREATE
TABLE
intermediate_comp_on_gz
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
' '
>
AS
SELECT
*
FROM
a
;
Moving
data
to
:
file
:
/
user
/
hive
/
warehouse
/
intermediate_comp_on_gz
Table
default
.
intermediate_comp_on_gz
stats
:
[
num_partitions
:
0
,
num_files
:
1
,
num_rows
:
2
,
total_size
:
8
,
raw_data_size
:
6
]
hive
>
dfs
-
cat
/
user
/
hive
/
warehouse
/
intermediate_comp_on_gz
/
000000
_0
;
4
5
3
2
Next, we can enable output compression:
hive
>
set
hive
.
exec
.
compress
.
output
=
true
;
hive
>
CREATE
TABLE
final_comp_on
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
' '
>
AS
SELECT
*
FROM
a
;
Moving
data
to
:
file
:
/
tmp
/
hive
-
edward
/
hive_2012
-
01
-
15
_11
-
11
-
01
_884_
...
/-
ext
-
10001
Moving
data
to
:
file
:
/
user
/
hive
/
warehouse
/
final_comp_on
Table
default
.
final_comp_on
stats
:
[
num_partitions
:
0
,
num_files
:
1
,
num_rows
:
2
,
total_size
:
16
,
raw_data_size
:
6
]
hive
>
dfs
-
ls
/
user
/
hive
/
warehouse
/
final_comp_on
;
Found
1
items
/
user
/
hive
/
warehouse
/
final_comp_on
/
000000
_0
.
deflate
The output table statistics show that the total_size
is 16, but the raw_data_size
is 6. The extra space is overhead
for the deflate algorithm. We can also see the output file is named
.deflate.
Trying to cat
the file is not suggested, as you
get binary output. However, Hive can query this data normally:
hive
>
dfs
-
cat
/
user
/
hive
/
warehouse
/
final_comp_on
/
000000
_0
.
deflate
;
...
UGLYBINARYHERE
...
hive
>
SELECT
*
FROM
final_comp_on
;
4
5
3
2
This ability to seamlessly work with compressed files is not
Hive-specific; Hadoop’s TextInputFormat
is at work here. While the name is confusing in this case, TextInputFormat
understands file extensions such
as .deflate or .gz and
decompresses these files on the fly. Hive is unaware if the underlying
files are uncompressed or compressed using any of the supported
compression schemes.
Let’s change the codec used by output compression to see the results (another line wrap for space):
hive
>
set
hive
.
exec
.
compress
.
output
=
true
;
hive
>
set
mapred
.
output
.
compression
.
codec
=
org
.
apache
.
hadoop
.
io
.
compress
.
GzipCodec
;
hive
>
CREATE
TABLE
final_comp_on_gz
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
' '
>
AS
SELECT
*
FROM
a
;
Moving
data
to
:
file
:
/
user
/
hive
/
warehouse
/
final_comp_on_gz
Table
default
.
final_comp_on_gz
stats
:
[
num_partitions
:
0
,
num_files
:
1
,
num_rows
:
2
,
total_size
:
28
,
raw_data_size
:
6
]
hive
>
dfs
-
ls
/
user
/
hive
/
warehouse
/
final_comp_on_gz
;
Found
1
items
/
user
/
hive
/
warehouse
/
final_comp_on_gz
/
000000
_0
.
gz
As you can see, the output folder now contains zero or more
.gz files. Hive has a quick hack to execute local
commands like zcat
from inside the Hive shell. The
!
tells Hive to fork and run the external command and
block until the system returns a result. zcat
is a
command-line utility that decompresses and displays output:
hive
>
!
/
bin
/
zcat
/
user
/
hive
/
warehouse
/
final_comp_on_gz
/
000000
_0
.
gz
;
4
5
3
2
hive
>
SELECT
*
FROM
final_comp_on_gz
;
OK
4
5
3
2
Time
taken
:
0
.
159
seconds
Using output compression like this results in binary compressed files that are small and, as a result, operations on them are very fast. However, recall that the number of output files is a side effect of how many mappers or reducers processed the data. In the worst case scenario, you can end up with one large binary file in a directory that is not splittable. This means that subsequent steps that have to read this data cannot work in parallel. The answer to this problem is to use sequence files:
hive
>
set
mapred
.
output
.
compression
.
type
=
BLOCK
;
hive
>
set
hive
.
exec
.
compress
.
output
=
true
;
hive
>
set
mapred
.
output
.
compression
.
codec
=
org
.
apache
.
hadoop
.
io
.
compress
.
GzipCodec
;
hive
>
CREATE
TABLE
final_comp_on_gz_seq
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
' '
>
STORED
AS
SEQUENCEFILE
>
AS
SELECT
*
FROM
a
;
Moving
data
to
:
file
:
/
user
/
hive
/
warehouse
/
final_comp_on_gz_seq
Table
default
.
final_comp_on_gz_seq
stats
:
[
num_partitions
:
0
,
num_files
:
1
,
num_rows
:
2
,
total_size
:
199
,
raw_data_size
:
6
]
hive
>
dfs
-
ls
/
user
/
hive
/
warehouse
/
final_comp_on_gz_seq
;
Found
1
items
/
user
/
hive
/
warehouse
/
final_comp_on_gz_seq
/
000000
_0
Sequence files are binary. But it is a nice exercise to see the header. To confirm the results are what was intended (output wrapped):
hive
>
dfs
-
cat
/
user
/
hive
/
warehouse
/
final_comp_on_gz_seq
/
000000
_0
;
SEQ
[]
org
.
apache
.
hadoop
.
io
.
BytesWritable
[]
org
.
apache
.
hadoop
.
io
.
BytesWritable
[]
org
.
apache
.
hadoop
.
io
.
compress
.
GzipCodec
[]
Because of the meta-information embedded in the sequence file and in
the Hive metastore, Hive can query the table without any specific
settings. Hadoop also offers the dfs -text
command to
strip the header and compression away from sequence files and return the
raw result:
hive
>
dfs
-
text
/
user
/
hive
/
warehouse
/
final_comp_on_gz_seq
/
000000
_0
;
4
5
3
2
hive
>
select
*
from
final_comp_on_gz_seq
;
OK
4
5
3
2
Finally, let’s use intermediate and output compression at the same time and set different compression codecs for each while saving the final output to sequence files! These settings are commonly done for production environments where data sets are large and such settings improve performance:
hive
>
set
mapred
.
map
.
output
.
compression
.
codec
=
org
.
apache
.
hadoop
.
io
.
compress
.
SnappyCodec
;
hive
>
set
hive
.
exec
.
compress
.
intermediate
=
true
;
hive
>
set
mapred
.
output
.
compression
.
type
=
BLOCK
;
hive
>
set
hive
.
exec
.
compress
.
output
=
true
;
hive
>
set
mapred
.
output
.
compression
.
codec
=
org
.
apache
.
hadoop
.
io
.
compress
.
GzipCodec
;
hive
>
CREATE
TABLE
final_comp_on_gz_int_compress_snappy_seq
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
' '
>
STORED
AS
SEQUENCEFILE
AS
SELECT
*
FROM
a
;
Hadoop has a format for storage known as HAR, which stands for Hadoop ARchive. A HAR file is like a TAR file that lives in the HDFS filesystem as a single file. However, internally it can contain multiple files and directories. In some use cases, older directories and files are less commonly accessed than newer files. If a particular partition contains thousands of files it will require significant overhead to manage it in the HDFS NameNode. By archiving the partition it is stored as a single, large file, but it can still be accessed by hive. The trade-off is that HAR files will be less efficient to query. Also, HAR files are not compressed, so they don’t save any space.
In the following example, we’ll use Hive’s own documentation as data.
First, create a partitioned table and load it with the text data from the Hive package:
hive
>
CREATE
TABLE
hive_text
(
line
STRING
)
PARTITIONED
BY
(
folder
STRING
);
hive
>
!
ls
$
HIVE_HOME
;
LICENSE
README
.
txt
RELEASE_NOTES
.
txt
hive
>
ALTER
TABLE
hive_text
ADD
PARTITION
(
folder
=
'docs'
);
hive
>
LOAD
DATA
INPATH
'${env:HIVE_HOME}/README.txt'
>
INTO
TABLE
hive_text
PARTITION
(
folder
=
'docs'
);
Loading
data
to
table
default
.
hive_text
partition
(
folder
=
docs
)
hive
>
LOAD
DATA
INPATH
'${env:HIVE_HOME}/RELEASE_NOTES.txt'
>
INTO
TABLE
hive_text
PARTITION
(
folder
=
'docs'
);
Loading
data
to
table
default
.
hive_text
partition
(
folder
=
docs
)
hive
>
SELECT
*
FROM
hive_text
WHERE
line
LIKE
'%hive%'
LIMIT
2
;
http
:
//
hive
.
apache
.
org
/
docs
-
Hive
0
.
8
.
0
ignores
the
hive
-
default
.
xml
file
,
though
we
continue
docs
Some versions of Hadoop, such as Hadoop v0.20.2, will require the JAR containing the Hadoop archive tools to be placed on the Hive auxlib:
$
mkdir
$
HIVE_HOME
/
auxlib
$
cp
$
HADOOP_HOME
/
hadoop
-
0
.
20
.
2
-
tools
.
jar
$
HIVE_HOME
/
auxlib
/
Take a look at the underlying structure of the table, before we archive it. Note the location of the table’s data partition, since it’s a managed, partitioned table:
hive> dfs -ls /user/hive/warehouse/hive_text/folder=
docs; Found 2 items /user/hive/warehouse/hive_text/folder=
docs/README.txt /user/hive/warehouse/hive_text/folder=
docs/RELEASE_NOTES.txt
The ALTER TABLE ... ARCHIVE
PARTITION
statement converts the table into an archived
table:
hive
>
SET
hive
.
archive
.
enabled
=
true
;
hive
>
ALTER
TABLE
hive_text
ARCHIVE
PARTITION
(
folder
=
'docs'
);
intermediate
.
archived
is
file
:
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs_INTERMEDIATE_ARCHIVED
intermediate
.
original
is
file
:
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs_INTERMEDIATE_ORIGINAL
Creating
data
.
har
for
file
:
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs
in
file
:
/
tmp
/
hive
-
edward
/
hive_
...
_3862901820512961909
/-
ext
-
10000
/
partlevel
Please
wait
...
(
this
may
take
a
while
)
Moving
file
:
/
tmp
/
hive
-
edward
/
hive_
...
_3862901820512961909
/-
ext
-
10000
/
partlevel
to
file
:
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs_INTERMEDIATE_ARCHIVED
Moving
file
:
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs
to
file
:
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs_INTERMEDIATE_ORIGINAL
Moving
file
:
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs_INTERMEDIATE_ARCHIVED
to
file
:
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs
(We reformatted the output slightly so it would fit, and used
...
to replace two timestamp strings in
the original output.)
The underlying table has gone from two files to one Hadoop archive (HAR file):
hive
>
dfs
-
ls
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs
;
Found
1
items
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs
/
data
.
har
The ALTER TABLE ... UNARCHIVE
PARTITION
command extracts the files from the HAR and puts them
back into HDFS:
ALTER
TABLE
hive_text
UNARCHIVE
PARTITION
(
folder
=
'docs'
);
Hive’s ability to read and write different types of compressed files is a big performance win as it saves disk space and processing overhead. This flexibility also aids in integration with other tools, as Hive can query many native file types without the need to write custom “adapters” in Java.