HiveQL is a declarative language where users issue declarative queries and Hive figures out how to translate them into MapReduce jobs. Most of the time, you don’t need to understand how Hive works, freeing you to focus on the problem at hand. While the sophisticated process of query parsing, planning, optimization, and execution is the result of many years of hard engineering work by the Hive team, most of the time you can remain oblivious to it.
However, as you become more experienced with Hive, learning about the theory behind Hive, and the low-level implementation details, will let you use Hive more effectively, especially where performance optimizations are concerned.
This chapter covers several different topics related to tuning Hive performance. Some tuning involves adjusting numeric configuration parameters (“turning the knobs”), while other tuning steps involve enabling or disabling specific features.
The first step to learning how Hive works (after reading
this book…) is to use the EXPLAIN
feature to learn how Hive translates queries into MapReduce jobs.
Consider the following example:
hive
>
DESCRIBE
onecol
;
number
int
hive
>
SELECT
*
FROM
onecol
;
5
5
4
hive
>
SELECT
SUM
(
number
)
FROM
onecol
;
14
Now, put the EXPLAIN
keyword in
front of the last query to see the query plan and other information. The
query will not be executed.
hive
>
EXPLAIN
SELECT
SUM
(
number
)
FROM
onecol
;
The output requires some explaining and practice to understand.
First, the abstract syntax tree is printed. This shows how Hive parsed the query into tokens and literals, as part of the first step in turning the query into the ultimate result:
ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME onecol))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL number))))))
(The indentation of the actual output was changed to fit the page.)
For those not familiar with parsers and tokenizers, this can look
overwhelming. However, even if you are a novice in this area, you can
study the output to get a sense for what Hive is doing with the SQL
statement. (As a first step, ignore the TOK_
prefixes.)
Even though our query will write its output to the console, Hive will actually write the output to a temporary file first, as shown by this part of the output:
'(TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))'
Next, we can see references to our column name number
, our table name onecol
, and the sum
function.
A Hive job consists of one or more stages, with dependencies between different stages. As you might expect, more complex queries will usually involve more stages and more stages usually requires more processing time to complete.
A stage could be a MapReduce job, a sampling stage, a merge stage, a limit stage, or a stage for some other task Hive needs to do. By default, Hive executes these stages one at a time, although later we’ll discuss parallel execution in Parallel Execution.
Some stages will be short, like those that move files around. Other stages may also finish quickly if they have little data to process, even though they require a map or reduce task:
STAGE
DEPENDENCIES
:
Stage
-
1
is
a
root
stage
Stage
-
0
is
a
root
stage
The STAGE PLAN
section is verbose
and complex. Stage-1
is the bulk of the
processing for this job and happens via a MapReduce job. A TableScan
takes the input of the table and
produces a single output column number
.
The Group By Operator
applies the
sum(number)
and produces an output
column _col0
(a synthesized name for an
anonymous result). All this is happening on the map side of the job, under
the Map Operator Tree
:
STAGE
PLANS
:
Stage
:
Stage
-
1
Map
Reduce
Alias
->
Map
Operator
Tree
:
onecol
TableScan
alias
:
onecol
Select
Operator
expressions
:
expr
:
number
type
:
int
outputColumnNames
:
number
Group
By
Operator
aggregations
:
expr
:
sum
(
number
)
bucketGroup
:
false
mode
:
hash
outputColumnNames
:
_col0
Reduce
Output
Operator
sort
order
:
tag
:
-
1
value
expressions
:
expr
:
_col0
type
:
bigint
On the reduce side, under the Reduce Operator Tree
, we see the same Group by Operator
but this time it is applying
sum
on _col0
. Finally, in the reducer we see the
File Output Operator
, which shows that
the output will be text, based on the string output format:
HiveIgnoreKeyTextOutputFormat
:
Reduce
Operator
Tree
:
Group
By
Operator
aggregations
:
expr
:
sum
(
VALUE
.
_col0
)
bucketGroup
:
false
mode
:
mergepartial
outputColumnNames
:
_col0
Select
Operator
expressions
:
expr
:
_col0
type
:
bigint
outputColumnNames
:
_col0
File
Output
Operator
compressed
:
false
GlobalTableId
:
0
table
:
input
format
:
org
.
apache
.
hadoop
.
mapred
.
TextInputFormat
output
format
:
org
.
apache
.
hadoop
.
hive
.
ql
.
io
.
HiveIgnoreKeyTextOutputFormat
Because this job has no LIMIT
clause, Stage-0
is a no-op
stage:
Stage
:
Stage
-
0
Fetch
Operator
limit
:
-
1
Understanding the intricate details of how Hive parses and plans every query is not useful all of the time. However, it is a nice to have for analyzing complex or poorly performing queries, especially as we try various tuning steps. We can observe what effect these changes have at the “logical” level, in tandem with performance measurements.
Using EXPLAIN EXTENDED
produces even more output. In an effort to “go green,” we won’t show the
entire output, but we will show you the Reduce
Operator Tree
to demonstrate the different output:
Reduce
Operator
Tree
:
Group
By
Operator
aggregations
:
expr
:
sum
(
VALUE
.
_col0
)
bucketGroup
:
false
mode
:
mergepartial
outputColumnNames
:
_col0
Select
Operator
expressions
:
expr
:
_col0
type
:
bigint
outputColumnNames
:
_col0
File
Output
Operator
compressed
:
false
GlobalTableId
:
0
directory
:
file
:
/
tmp
/
edward
/
hive_2012
-
[
long
number
]
/-
ext
-
10001
NumFilesPerFileSink
:
1
Stats
Publishing
Key
Prefix
:
file
:
/
tmp
/
edward
/
hive_2012
-
[
long
number
]
/-
ext
-
10001
/
table
:
input
format
:
org
.
apache
.
hadoop
.
mapred
.
TextInputFormat
output
format
:
org
.
apache
.
hadoop
.
hive
.
ql
.
io
.
HiveIgnoreKeyTextOutputFormat
properties
:
columns
_col0
columns
.
types
bigint
escape
.
delim
serialization
.
format
1
TotalFiles
:
1
GatherStats
:
false
MultiFileSpray
:
false
We encourage you to compare the two outputs for the Reduce Operator Tree
.
The LIMIT
clause is
commonly used, often by people working with the CLI. However, in many
cases a LIMIT
clause still executes the
entire query, then only returns a handful of results. Because this
behavior is generally wasteful, it should be avoided when possible. Hive has a configuration property
to enable sampling of source data for use with LIMIT
:
<
property
>
<
name
>
hive
.
limit
.
optimize
.
enable
</
name
>
<
value
>
true
</
value
>
<
description
>
Whether
to
enable
to
optimization
to
try
a
smaller
subset
of
data
for
simple
LIMIT
first
.
</
description
>
</
property
>
Once the hive.limit.optimize.enable
is set to true, two
variables control its operation, hive.limit.row.max.size
and hive.limit.optimize.limit.file
:
<
property
>
<
name
>
hive
.
limit
.
row
.
max
.
size
</
name
>
<
value
>
100000
</
value
>
<
description
>
When
trying
a
smaller
subset
of
data
for
simple
LIMIT
,
how
much
size
we
need
to
guarantee
each
row
to
have
at
least
.
</
description
>
</
property
>
<
property
>
<
name
>
hive
.
limit
.
optimize
.
limit
.
file
</
name
>
<
value
>
10
</
value
>
<
description
>
When
trying
a
smaller
subset
of
data
for
simple
LIMIT
,
maximum
number
of
files
we
can
sample
.
</
description
>
</
property
>
A drawback of this feature is the risk that useful input data will
never get processed. For example, any query that requires a reduce step,
such as most JOIN
and GROUP BY
operations, most calls to
aggregate functions, etc., will have very different
results. Perhaps this difference is okay in many cases, but it’s important
to understand.
We discussed optimizing join performance in Join Optimizations and Map-side Joins. We
won’t reproduce the details here, but just remind yourself that it’s
important to know which table is the largest and put it
last in the JOIN
clause, or use the /* streamtable(table_name) */
directive.
If all but one table is small enough, typically to fit in memory, then Hive can perform a map-side join, eliminating the need for reduce tasks and even some map tasks. Sometimes even tables that do not fit in memory are good candidates because removing the reduce phase outweighs the cost of bringing semi-large tables into each map tasks.
Many Hadoop jobs need the full scalability benefits of Hadoop to process large data sets. However, there are times when the input to Hive is very small. In these cases, the overhead of launching tasks for queries consumes a significant percentage of the overall job execution time. In many of these cases, Hive can leverage the lighter weight of the local mode to perform all the tasks for the job on a single machine and sometimes in the same process. The reduction in execution times can be dramatic for small data sets.
You can explicitly enable local mode temporarily, as in this example:
hive
>
set
oldjobtracker
=
${
hiveconf
:
mapred
.
job
.
tracker
}
;
hive
>
set
mapred
.
job
.
tracker
=
local
;
hive
>
set
mapred
.
tmp
.
dir
=/
home
/
edward
/
tmp
;
hive
>
SELECT
*
from
people
WHERE
firstname
=
bob
;
...
hive
>
set
mapred
.
job
.
tracker
=
${
oldjobtracker
}
;
You can also tell Hive to automatically apply this optimization by
setting hive.exec.mode.local.auto
to
true
, perhaps in your
$HOME/.hiverc.
To set this property permanently for all users, change the value in your $HIVE_HOME/conf/hive-site.xml:
<property>
<name>
hive.exec.mode.local.auto</name>
<value>
true</value>
<description>
Let hive determine whether to run in local mode automatically</description>
</property>
Hive converts a query into one or more stages. Stages could be a MapReduce stage, a sampling stage, a merge stage, a limit stage, or other possible tasks Hive needs to do. By default, Hive executes these stages one at a time. However, a particular job may consist of some stages that are not dependent on each other and could be executed in parallel, possibly allowing the overall job to complete more quickly. However, if more stages are run simultaneously, the job may complete much faster.
Setting hive.exec.parallel
to
true
enables parallel execution. Be
careful in a shared cluster, however. If a job is running more stages in
parallel, it will increase its cluster utilization:
<property>
<name>
hive.exec.parallel</name>
<value>
true</value>
<description>
Whether to execute jobs in parallel</description>
</property>
Strict mode is a setting in Hive that prevents users from issuing queries that could have unintended and undesirable effects.
Setting the property hive.mapred.mode
to strict
disables three types of queries.
First, queries on partitioned tables are not permitted unless they
include a partition filter in the WHERE
clause, limiting their scope. In other
words, you’re prevented from queries that will scan all partitions. The
rationale for this limitation is that partitioned tables often hold very
large data sets that may be growing rapidly. An unrestricted partition
could consume unacceptably large resources over such a large table:
hive
>
SELECT
DISTINCT
(
planner_id
)
FROM
fracture_ins
WHERE
planner_id
=
5
;
FAILED
:
Error
in
semantic
analysis
:
No
Partition
Predicate
Found
for
Alias
"fracture_ins"
Table
"fracture_ins"
The following enhancement adds a partition filter—the table
partitions—to the WHERE
clause:
hive
>
SELECT
DISTINCT
(
planner_id
)
FROM
fracture_ins
>
WHERE
planner_id
=
5
AND
hit_date
=
20120101
;
...
normal
results
...
The second type of restricted query are those with ORDER BY
clauses, but no LIMIT
clause. Because ORDER BY
sends all results to a single reducer
to perform the ordering, forcing the user to specify a LIMIT
clause prevents the reducer from executing
for an extended period of time:
hive
>
SELECT
*
FROM
fracture_ins
WHERE
hit_date
>
2012
ORDER
BY
planner_id
;
FAILED
:
Error
in
semantic
analysis
:
line
1
:
56
In
strict
mode
,
limit
must
be
specified
if
ORDER
BY
is
present
planner_id
To issue this query, add a LIMIT
clause:
hive
>
SELECT
*
FROM
fracture_ins
WHERE
hit_date
>
2012
ORDER
BY
planner_id
>
LIMIT
100000
;
...
normal
results
...
The third and final type of query prevented is a Cartesian
product. Users coming from the relational database world may
expect that queries that perform a JOIN
not with an ON
clause but with a
WHERE
clause will have the query
optimized by the query planner, effectively converting the WHERE
clause into an ON
clause. Unfortunately, Hive does not perform
this optimization, so a runaway query will occur if the tables are
large:
hive
>
SELECT
*
FROM
fracture_act
JOIN
fracture_ads
>
WHERE
fracture_act
.
planner_id
=
fracture_ads
.
planner_id
;
FAILED
:
Error
in
semantic
analysis
:
In
strict
mode
,
cartesian
product
is
not
allowed
.
If
you
really
want
to
perform
the
operation
,
+
set
hive
.
mapred
.
mode
=
nonstrict
+
Here is a properly constructed query with JOIN
and ON
clauses:
hive
>
SELECT
*
FROM
fracture_act
JOIN
fracture_ads
>
ON
(
fracture_act
.
planner_id
=
fracture_ads
.
planner_id
);
...
normal
results
...
Hive is able to parallelize queries by breaking the query into one or more MapReduce jobs. Each of which might have multiple mapper and reducer tasks, at least some of which can run in parallel. Determining the optimal number of mappers and reducers depends on many variables, such as the size of the input and the operation being performed on the data.
A balance is required. Having too many mapper or reducer tasks causes excessive overhead in starting, scheduling, and running the job, while too few tasks means the inherent parallelism of the cluster is underutilized.
When running a Hive query that has a reduce phase, the CLI prints
information about how the number of reducers can be tuned. Let’s see an
example that uses a GROUP BY
query,
because they always require a reduce phase. In contrast, many other
queries are converted into map-only jobs:
hive
>
SELECT
pixel_id
,
count
FROM
fracture_ins
WHERE
hit_date
=
20120119
>
GROUP
BY
pixel_id
;
Total
MapReduce
jobs
=
1
Launching
Job
1
out
of
1
Number
of
reduce
tasks
not
specified
.
Estimated
from
input
data
size
:
3
In
order
to
change
the
average
load
for
a
reducer
(
in
bytes
):
set
hive
.
exec
.
reducers
.
bytes
.
per
.
reducer
=<
number
>
In
order
to
limit
the
maximum
number
of
reducers
:
set
hive
.
exec
.
reducers
.
max
=<
number
>
In
order
to
set
a
constant
number
of
reducers
:
set
mapred
.
reduce
.
tasks
=<
number
>
...
Hive is determining the number of reducers from the input size. This
can be confirmed using the dfs -count
command, which works something like the Linux du
-s
command; it computes a total size for all the data under a
given directory:
[
edward
@
etl02
~
]
$
hadoop
dfs
-
count
/
user
/
media6
/
fracture
/
ins
/* | tail -4
1 8 2614608737 hdfs://.../user/media6/fracture/ins/hit_date=20120118
1 7 2742992546 hdfs://.../user/media6/fracture/ins/hit_date=20120119
1 17 2656878252 hdfs://.../user/media6/fracture/ins/hit_date=20120120
1 2 362657644 hdfs://.../user/media6/fracture/ins/hit_date=20120121
(We’ve reformatted the output and elided some details for space.)
The default value of hive.exec.reducers.bytes.per.reducer
is 1 GB.
Changing this value to 750 MB causes Hive to estimate four reducers for
this job:
hive
>
set
hive
.
exec
.
reducers
.
bytes
.
per
.
reducer
=
750000000
;
hive
>
SELECT
pixel_id
,
count
(
1
)
FROM
fracture_ins
WHERE
hit_date
=
20120119
>
GROUP
BY
pixel_id
;
Total
MapReduce
jobs
=
1
Launching
Job
1
out
of
1
Number
of
reduce
tasks
not
specified
.
Estimated
from
input
data
size
:
4
...
This default typically yields good results. However, there are cases where a query’s map phase will create significantly more data than the input size. In the case of excessive map phase data, the input size of the default might be selecting too few reducers. Likewise the map function might filter a large portion of the data from the data set and then fewer reducers may be justified.
A quick way to experiment is by setting the number of reducers to a
fixed size, rather than allowing Hive to calculate the value. If you
remember, the Hive default estimate is three reducers. Set mapred.reduce.tasks
to different numbers and
determine if more or fewer reducers results in faster run times. Remember
that benchmarking like this is complicated by external factors such as
other users running jobs simultaneously. Hadoop has a few seconds overhead
to start up and schedule map and reduce tasks. When executing performance
tests, it’s important to keep these factors in mind, especially if the
jobs are small.
The hive.exec.reducers.max
property is useful for controlling resource utilization on shared clusters
when dealing with large jobs. A Hadoop cluster has a fixed number of map
and reduce “slots” to allocate to tasks. One large job could reserve all
of the slots and block other jobs from starting. Setting hive.exec.reducers.max
can stop a query from
taking too many reducer resources. It is a good idea to set this value in
your $HIVE_HOME/conf/hive-site.xml. A suggested
formula is to set the value to the result of this calculation:
(Total Cluster Reduce Slots * 1.5) / (avg number of queries running)
The 1.5 multiplier is a fudge factor to prevent underutilization of the cluster.
JVM reuse is a Hadoop tuning parameter that is very relevant to Hive performance, especially scenarios where it’s hard to avoid small files and scenarios with lots of tasks, most which have short execution times.
The default configuration of Hadoop will typically launch map or reduce tasks in a forked JVM. The JVM start-up may create significant overhead, especially when launching jobs with hundreds or thousands of tasks. Reuse allows a JVM instance to be reused up to N times for the same job. This value is set in Hadoop’s mapred-site.xml (in $HADOOP_HOME/conf):
<property>
<name>
mapred.job.reuse.jvm.num.tasks</name>
<value>
10</value>
<description>
How many tasks to run per jvm. If set to -1, there is no limit.</description>
</property>
A drawback of this feature is that JVM reuse will keep reserved task slots open until the job completes, in case they are needed for reuse. If an “unbalanced” job has some reduce tasks that run considerably longer than the others, the reserved slots will sit idle, unavailable for other jobs, until the last task completes.
Indexes may be used to accelerate the calculation speed of a
GROUP BY
query.
Hive contains an implementation of bitmap indexes since v0.8.0. The main use case for bitmap indexes is when there are comparatively few values for a given column. See Bitmap Indexes for more information.
As explained in Dynamic Partition Inserts, dynamic partition INSERT
statements enable a succinct SELECT
statement to create many new partitions
for insertion into a partitioned table.
This is a very powerful feature, however if the number of partitions is high, a large number of output handles must be created on the system. This is a somewhat uncommon use case for Hadoop, which typically creates a few files at once and streams large amounts of data to them.
Out of the box, Hive is configured to prevent dynamic partition inserts from creating more than 1,000 or so partitions. While it can be bad for a table to have too many partitions, it is generally better to tune this setting to the larger value and allow these queries to work.
First, it is always good to set the dynamic partition mode to strict
in your hive-site.xml, as discussed in Strict Mode. When strict
mode is on, at least one partition has to be static, as
demonstrated in Dynamic Partition Inserts:
<property>
<name>
hive.exec.dynamic.partition.mode</name>
<value>
strict</value>
<description>
In strict mode, the user must specify at least one static partition in case the user accidentally overwrites all partitions.</description>
</property>
Then, increase the other relevant properties to allow queries that will create a large number of dynamic partitions, for example:
<property>
<name>
hive.exec.max.dynamic.partitions</name>
<value>
300000</value>
<description>
Maximum number of dynamic partitions allowed to be created in total.</description>
</property>
<property>
<name>
hive.exec.max.dynamic.partitions.pernode</name>
<value>
10000</value>
<description>
Maximum number of dynamic partitions allowed to be created in each mapper/reducer node.</description>
</property>
Another setting controls how many files a DataNode will allow to be open at once. It must be set in the DataNode’s $HADOOP_HOME/conf/hdfs-site.xml.
In Hadoop v0.20.2, the default value is 256, which is too low. The value affects the number of maximum threads and resources, so setting it to a very high number is not recommended. Note also that in Hadoop v0.20.2, changing this variable requires restarting the DataNode to take effect:
<property>
<name>
dfs.datanode.max.xcievers</name>
<value>
8192</value>
</property>
Speculative execution is a feature of Hadoop that launches a certain number of duplicate tasks. While this consumes more resources computing duplicate copies of data that may be discarded, the goal of this feature is to improve overall job progress by getting individual task results faster, and detecting then black-listing slow-running TaskTrackers.
Hadoop speculative execution is controlled in the $HADOOP_HOME/conf/mapred-site.xml file by the following two variables:
<property>
<name>
mapred.map.tasks.speculative.execution</name>
<value>
true</value>
<description>
If true, then multiple instances of some map tasks may be executed in parallel.</description>
</property>
<property>
<name>
mapred.reduce.tasks.speculative.execution</name>
<value>
true</value>
<description>
If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
</property>
However, Hive provides its own variable to control reduce-side speculative execution:
<property>
<name>
hive.mapred.reduce.tasks.speculative.execution</name>
<value>
true</value>
<description>
Whether speculative execution for reducers should be turned on.</description>
</property>
It is hard to give a concrete recommendation about tuning these speculative execution variables. If you are very sensitive to deviations in runtime, you may wish to turn these features on. However, if you have long-running map or reduce tasks due to large amounts of input, the waste could be significant.
Another special optimization attempts to combine multiple
GROUP BY
operations in a query into a
single MapReduce job. For this optimization to work, a common set of
GROUP BY
keys is required:
<property>
<name>
hive.multigroupby.singlemr</name>
<value>
false</value>
<description>
Whether to optimize multi group by query to generate single M/R job plan. If the multi group by query has common group by keys, it will be optimized to generate single M/R job.</description>
</property>
Hive provides two virtual columns: one for the input filename for split and the other for the block offset in the file. These are helpful when diagnosing queries where Hive is producing unexpected or null results. By projecting these “columns,” you can see which file and row is causing problems:
hive
>
set
hive
.
exec
.
rowoffset
=
true
;
hive
>
SELECT
INPUT__FILE__NAME
,
BLOCK__OFFSET__INSIDE__FILE
,
line
>
FROM
hive_text
WHERE
line
LIKE
'%hive%'
LIMIT
2
;
har
:
//
file
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs
/
data
.
har
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs
/
README
.
txt
2243
http
:
//
hive
.
apache
.
org
/
har
:
//
file
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs
/
data
.
har
/
user
/
hive
/
warehouse
/
hive_text
/
folder
=
docs
/
README
.
txt
3646
-
Hive
0
.
8
.
0
ignores
the
hive
-
default
.
xml
file
,
though
we
continue
(We wrapped the long output and put a blank line between the two output rows.)
A third virtual column provides the row offset of the file. It must be enabled explicitly:
<property>
<name>
hive.exec.rowoffset</name>
<value>
true</value>
<description>
Whether to provide the row offset virtual column</description>
</property>
Now it can be used in queries:
hive> SELECT INPUT__FILE__NAME, BLOCK__OFFSET__INSIDE__FILE, > ROW__OFFSET__INSIDE__BLOCK > FROM hive_text WHERE line LIKE '%hive%' limit 2; file:/user/hive/warehouse/hive_text/folder=docs/README.txt 2243 0 file:/user/hive/warehouse/hive_text/folder=docs/README.txt 3646 0