This chapter continues our discussion of HiveQL, the Hive query language, focusing on the data manipulation language parts that are used to put data into tables and to extract data from tables to the filesystem.
This chapter uses SELECT ... WHERE
clauses extensively when we discuss populating tables with data queried from
other tables. So, why aren’t we covering SELECT ...
WHERE
clauses first, instead of waiting until the next chapter,
Chapter 6?
Since we just finished discussing how to create tables, we wanted to cover the next obvious topic: how to get data into these tables so you’ll have something to query! We assume you already understand the basics of SQL, so these clauses won’t be new to you. If they are, please refer to Chapter 6 for details.
Since Hive has no row-level insert, update, and delete operations, the only way to put data into an table is to use one of the “bulk” load operations. Or you can just write files in the correct directories by other means.
We saw an example of how to load data into a managed table in Partitioned, Managed Tables, which we repeat here with an
addition, the use of the OVERWRITE
keyword:
LOAD
DATA
LOCAL
INPATH
'${env:HOME}/california-employees'
OVERWRITE
INTO
TABLE
employees
PARTITION
(
country
=
'US'
,
state
=
'CA'
);
This command will first create the directory for the partition, if it doesn’t already exist, then copy the data to it.
If the target table is not partitioned, you omit the PARTITION
clause.
It is conventional practice to specify a path that is a directory, rather than an individual file. Hive will copy all the files in the directory, which give you the flexibility of organizing the data into multiple files and changing the file naming convention, without requiring a change to your Hive scripts. Either way, the files will be copied to the appropriate location for the table and the names will be the same.
If the LOCAL
keyword is used, the
path is assumed to be in the local filesystem. The data is
copied into the final location. If LOCAL
is omitted, the path is assumed to be in
the distributed filesystem. In this case, the data is
moved from the path to the final location.
LOAD DATA LOCAL ...
copies the local data to the final location in the
distributed filesystem, while LOAD DATA
...
(i.e., without LOCAL
)
moves the data to the final location.
The rationale for this inconsistency is the assumption that you usually don’t want duplicate copies of your data files in the distributed filesystem.
Also, because files are moved in this case, Hive requires the source
and target files and directories to be in the same filesystem. For
example, you can’t use LOAD DATA
to
load (move) data from one HDFS cluster to another.
It is more robust to specify a full path, but relative paths can be used. When running in local mode, the relative path is interpreted relative to the user’s working directory when the Hive CLI was started. For distributed or pseudo-distributed mode, the path is interpreted relative to the user’s home directory in the distributed filesystem, which is /user/$USER by default in HDFS and MapRFS.
If you specify the OVERWRITE
keyword, any data already present in
the target directory will be deleted first. Without the keyword, the new
files are simply added to the target directory. However, if files already
exist in the target directory that match filenames being loaded, the old
files are overwritten.
Versions of Hive before v0.9.0 had the following bug: when the
OVERWRITE
keyword was not used, an
existing data file in the target directory would be overwritten if its
name matched the name of a data file being written to the directory.
Hence, data would be lost. This bug was fixed in the v0.9.0
release.
The PARTITION
clause is required
if the table is partitioned and you must specify a value for each
partition key.
In the example, the data will now exist in the following directory:
hdfs://master_server/user/hive/warehouse/mydb.db/employees/country=US/state=CA
Another limit on the file path used, the INPATH
clause, is that it cannot contain any
directories.
Hive does not verify that the data you are loading matches the
schema for the table. However, it will verify that the file format matches
the table definition. For example, if the table was created with SEQUENCEFILE
storage, the loaded files must be
sequence files.
The INSERT
statement lets
you load data into a table from a query. Reusing our employees
example from the previous chapter,
here is an example for the state of Oregon, where we presume the data is
already in another table called staged_employees
. For reasons we’ll discuss
shortly, let’s use different names for the country and state fields in
staged_employees
, calling them cnty
and st
,
respectively:
INSERT
OVERWRITE
TABLE
employees
PARTITION
(
country
=
'US'
,
state
=
'OR'
)
SELECT
*
FROM
staged_employees
se
WHERE
se
.
cnty
=
'US'
AND
se
.
st
=
'OR'
;
With OVERWRITE
, any
previous contents of the partition (or whole table if not partitioned) are
replaced.
If you drop the keyword OVERWRITE
or replace it with INTO
, Hive appends
the data rather than replaces it. This feature is only available in Hive
v0.8.0 or later.
This example suggests one common scenario where this feature is useful: data has been staged in a directory, exposed to Hive as an external table, and now you want to put it into the final, partitioned table. A workflow like this is also useful if you want the target table to have a different record format than the source table (e.g., a different field delimiter).
However, if staged_employees
is
very large and you run 65 of these statements to cover all states, then it
means you are scanning staged_employees
65 times! Hive offers an alternative INSERT
syntax that allows you to scan the input
data once and split it multiple ways. The following example shows this
feature for creating the employees
partitions for three states:
FROM
staged_employees
se
INSERT
OVERWRITE
TABLE
employees
PARTITION
(
country
=
'US'
,
state
=
'OR'
)
SELECT
*
WHERE
se
.
cnty
=
'US'
AND
se
.
st
=
'OR'
INSERT
OVERWRITE
TABLE
employees
PARTITION
(
country
=
'US'
,
state
=
'CA'
)
SELECT
*
WHERE
se
.
cnty
=
'US'
AND
se
.
st
=
'CA'
INSERT
OVERWRITE
TABLE
employees
PARTITION
(
country
=
'US'
,
state
=
'IL'
)
SELECT
*
WHERE
se
.
cnty
=
'US'
AND
se
.
st
=
'IL'
;
We have used indentation to make it clearer how the clauses
group together. Each record read from staged_employees
will be evaluated with each
SELECT … WHERE …
clause. Those clauses
are evaluated independently; this is not an IF … THEN … ELSE …
construct!
In fact, by using this construct, some records from the source table can be written to multiple partitions of the destination table or none of them.
If a record satisfied a given SELECT …
WHERE …
clause, it gets written to the specified table and
partition. To be clear, each INSERT
clause can insert into a different table, when desired, and some of those
tables could be partitioned while others aren’t.
Hence, some records from the input might get written to multiple output locations and others might get dropped!
You can mix INSERT OVERWRITE
clauses and INSERT INTO
clauses, as
well.
There’s still one problem with this syntax: if you have a lot of partitions to create, you have to write a lot of SQL! Fortunately, Hive also supports a dynamic partition feature, where it can infer the partitions to create based on query parameters. By comparison, up until now we have considered only static partitions.
Consider this change to the previous example:
INSERT
OVERWRITE
TABLE
employees
PARTITION
(
country
,
state
)
SELECT
...,
se
.
cnty
,
se
.
st
FROM
staged_employees
se
;
Hive determines the values of the partition keys, country
and state
, from the last two columns in the
SELECT
clause. This is why we used
different names in staged_employees
,
to emphasize that the relationship between the source column values and
the output partition values is by position only and
not by matching on names.
Suppose that staged_employees
has data for a total of 100 country and state pairs. After running this
query, employees
will have 100
partitions!
You can also mix dynamic and
static partitions. This variation of the previous
query specifies a static value for the country
(US
) and a dynamic value
for the state
:
INSERT
OVERWRITE
TABLE
employees
PARTITION
(
country
=
'US'
,
state
)
SELECT
...,
se
.
cnty
,
se
.
st
FROM
staged_employees
se
WHERE
se
.
cnty
=
'US'
;
The static partition keys must come before the dynamic partition keys.
Dynamic partitioning is not enabled by default. When it is enabled, it works in “strict” mode by default, where it expects at least some columns to be static. This helps protect against a badly designed query that generates a gigantic number of partitions. For example, you partition by timestamp and generate a separate partition for each second! Perhaps you meant to partition by day or maybe hour instead. Several other properties are also used to limit excess resource utilization. Table 5-1 describes these properties.
Table 5-1. Dynamic partitions properties
Name | Default | Description |
---|---|---|
| | Set to |
| | Set to |
| | The maximum number of dynamic partitions that can be created by each mapper or reducer. Raises a fatal error if one mapper or reducer attempts to create more than the threshold. |
|
| The total number of dynamic partitions that can be created by one statement with dynamic partitioning. Raises a fatal error if the limit is exceeded. |
| | The maximum total number of files that can be created globally. A Hadoop counter is used to track the number of files created. Raises a fatal error if the limit is exceeded. |
So, for example, our first example using dynamic partitioning for all partitions might actually look this, where we set the desired properties just before use:
hive
>
set
hive
.
exec
.
dynamic
.
partition
=
true
;
hive
>
set
hive
.
exec
.
dynamic
.
partition
.
mode
=
nonstrict
;
hive
>
set
hive
.
exec
.
max
.
dynamic
.
partitions
.
pernode
=
1000
;
hive
>
INSERT
OVERWRITE
TABLE
employees
>
PARTITION
(
country
,
state
)
>
SELECT
...,
se
.
cty
,
se
.
st
>
FROM
staged_employees
se
;
You can also create a table and insert query results into it in one statement:
CREATE
TABLE
ca_employees
AS
SELECT
name
,
salary
,
address
FROM
employees
WHERE
se
.
state
=
'CA'
;
This table contains just the name
, salary
,
and address
columns from the employee
table records for employees in
California. The schema for the new table is taken from the SELECT
clause.
A common use for this feature is to extract a convenient subset of data from a larger, more unwieldy table.
This feature can’t be used with external tables. Recall that
“populating” a partition for an external table is done with an ALTER TABLE
statement, where we aren’t “loading”
data, per se, but pointing metadata to a location where the data can be
found.
How do we get data out of tables? If the data files are already formatted the way you want, then it’s simple enough to copy the directories or files:
hadoop fs -cp source_path target_path
Otherwise, you can use INSERT … DIRECTORY
…
, as in this example:
INSERT
OVERWRITE
LOCAL
DIRECTORY
'/tmp/ca_employees'
SELECT
name
,
salary
,
address
FROM
employees
WHERE
se
.
state
=
'CA'
;
OVERWRITE
and LOCAL
have the same interpretations as before
and paths are interpreted following the usual rules. One or more files
will be written to /tmp/ca_employees, depending on the number of reducers
invoked.
The specified path can also be a full URI (e.g., hdfs://master-server/tmp/ca_employees).
Independent of how the data is actually stored in the source table, it is written to files with all fields serialized as strings. Hive uses the same encoding in the generated output files as it uses for the tables internal storage.
As a reminder, we can look at the results from within the hive
CLI:
hive
>
!
ls
/
tmp
/
ca_employees
;
000000
_0
hive
>
!
cat
/
tmp
/
payroll
/
000000
_0
John
Doe100000
.
0201
San
Antonio
CircleMountain
ViewCA94040
Mary
Smith80000
.
01
Infinity
LoopCupertinoCA95014
...
Yes, the filename is 000000_0
. If
there were two or more reducers writing output, we would have additional
files with similar names (e.g., 000001_0
).
The fields appear to be joined together without delimiters because
the ^A
and ^B
separators aren’t rendered.
Just like inserting data to tables, you can specify multiple inserts to directories:
FROM
staged_employees
se
INSERT
OVERWRITE
DIRECTORY
'/tmp/or_employees'
SELECT
*
WHERE
se
.
cty
=
'US'
and
se
.
st
=
'OR'
INSERT
OVERWRITE
DIRECTORY
'/tmp/ca_employees'
SELECT
*
WHERE
se
.
cty
=
'US'
and
se
.
st
=
'CA'
INSERT
OVERWRITE
DIRECTORY
'/tmp/il_employees'
SELECT
*
WHERE
se
.
cty
=
'US'
and
se
.
st
=
'IL'
;
There are some limited options for customizing the output of the
data (other than writing a custom OUTPUTFORMAT
, as discussed in Customizing Table Storage Formats). To format columns, the
built-in functions include those for formatting strings, such as converting case,
padding output, and more. See Other built-in functions
for more details.
The field delimiter for the table can be problematic. For example,
if it uses the default ^A
delimiter. If
you export table data frequently, it might be appropriate to use comma or
tab delimiters.
Another workaround is to define a “temporary” table with the storage
configured to match the desired output format (e.g., tab-delimited
fields). Then write a query result to that table and use INSERT OVERWRITE DIRECTORY
, selecting from the
temporary table. Unlike many relational databases, there is no temporary
table feature in Hive. You have to manually drop any tables you create
that aren’t intended to be permanent.