Chapter 5. HiveQL: Data Manipulation

This chapter continues our discussion of HiveQL, the Hive query language, focusing on the data manipulation language parts that are used to put data into tables and to extract data from tables to the filesystem.

This chapter uses SELECT ... WHERE clauses extensively when we discuss populating tables with data queried from other tables. So, why aren’t we covering SELECT ... WHERE clauses first, instead of waiting until the next chapter, Chapter 6?

Since we just finished discussing how to create tables, we wanted to cover the next obvious topic: how to get data into these tables so you’ll have something to query! We assume you already understand the basics of SQL, so these clauses won’t be new to you. If they are, please refer to Chapter 6 for details.

Loading Data into Managed Tables

Since Hive has no row-level insert, update, and delete operations, the only way to put data into an table is to use one of the “bulk” load operations. Or you can just write files in the correct directories by other means.

We saw an example of how to load data into a managed table in Partitioned, Managed Tables, which we repeat here with an addition, the use of the OVERWRITE keyword:

LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
OVERWRITE INTO TABLE employees
PARTITION (country = 'US', state = 'CA');

This command will first create the directory for the partition, if it doesn’t already exist, then copy the data to it.

If the target table is not partitioned, you omit the PARTITION clause.

It is conventional practice to specify a path that is a directory, rather than an individual file. Hive will copy all the files in the directory, which give you the flexibility of organizing the data into multiple files and changing the file naming convention, without requiring a change to your Hive scripts. Either way, the files will be copied to the appropriate location for the table and the names will be the same.

If the LOCAL keyword is used, the path is assumed to be in the local filesystem. The data is copied into the final location. If LOCAL is omitted, the path is assumed to be in the distributed filesystem. In this case, the data is moved from the path to the final location.

Warning

LOAD DATA LOCAL ... copies the local data to the final location in the distributed filesystem, while LOAD DATA ... (i.e., without LOCAL) moves the data to the final location.

The rationale for this inconsistency is the assumption that you usually don’t want duplicate copies of your data files in the distributed filesystem.

Also, because files are moved in this case, Hive requires the source and target files and directories to be in the same filesystem. For example, you can’t use LOAD DATA to load (move) data from one HDFS cluster to another.

It is more robust to specify a full path, but relative paths can be used. When running in local mode, the relative path is interpreted relative to the user’s working directory when the Hive CLI was started. For distributed or pseudo-distributed mode, the path is interpreted relative to the user’s home directory in the distributed filesystem, which is /user/$USER by default in HDFS and MapRFS.

If you specify the OVERWRITE keyword, any data already present in the target directory will be deleted first. Without the keyword, the new files are simply added to the target directory. However, if files already exist in the target directory that match filenames being loaded, the old files are overwritten.

Warning

Versions of Hive before v0.9.0 had the following bug: when the OVERWRITE keyword was not used, an existing data file in the target directory would be overwritten if its name matched the name of a data file being written to the directory. Hence, data would be lost. This bug was fixed in the v0.9.0 release.

The PARTITION clause is required if the table is partitioned and you must specify a value for each partition key.

In the example, the data will now exist in the following directory:

hdfs://master_server/user/hive/warehouse/mydb.db/employees/country=US/state=CA

Another limit on the file path used, the INPATH clause, is that it cannot contain any directories.

Hive does not verify that the data you are loading matches the schema for the table. However, it will verify that the file format matches the table definition. For example, if the table was created with SEQUENCEFILE storage, the loaded files must be sequence files.

Inserting Data into Tables from Queries

The INSERT statement lets you load data into a table from a query. Reusing our employees example from the previous chapter, here is an example for the state of Oregon, where we presume the data is already in another table called staged_employees. For reasons we’ll discuss shortly, let’s use different names for the country and state fields in staged_employees, calling them cnty and st, respectively:

INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'OR')
SELECT * FROM staged_employees se
WHERE se.cnty = 'US' AND se.st = 'OR';

With OVERWRITE, any previous contents of the partition (or whole table if not partitioned) are replaced.

If you drop the keyword OVERWRITE or replace it with INTO, Hive appends the data rather than replaces it. This feature is only available in Hive v0.8.0 or later.

This example suggests one common scenario where this feature is useful: data has been staged in a directory, exposed to Hive as an external table, and now you want to put it into the final, partitioned table. A workflow like this is also useful if you want the target table to have a different record format than the source table (e.g., a different field delimiter).

However, if staged_employees is very large and you run 65 of these statements to cover all states, then it means you are scanning staged_employees 65 times! Hive offers an alternative INSERT syntax that allows you to scan the input data once and split it multiple ways. The following example shows this feature for creating the employees partitions for three states:

FROM staged_employees se
INSERT OVERWRITE TABLE employees
  PARTITION (country = 'US', state = 'OR')
  SELECT * WHERE se.cnty = 'US' AND se.st = 'OR'
INSERT OVERWRITE TABLE employees
  PARTITION (country = 'US', state = 'CA')
  SELECT * WHERE se.cnty = 'US' AND se.st = 'CA'
INSERT OVERWRITE TABLE employees
  PARTITION (country = 'US', state = 'IL')
  SELECT * WHERE se.cnty = 'US' AND se.st = 'IL';

We have used indentation to make it clearer how the clauses group together. Each record read from staged_employees will be evaluated with each SELECT … WHERE … clause. Those clauses are evaluated independently; this is not an IF … THEN … ELSE … construct!

In fact, by using this construct, some records from the source table can be written to multiple partitions of the destination table or none of them.

If a record satisfied a given SELECT … WHERE … clause, it gets written to the specified table and partition. To be clear, each INSERT clause can insert into a different table, when desired, and some of those tables could be partitioned while others aren’t.

Hence, some records from the input might get written to multiple output locations and others might get dropped!

You can mix INSERT OVERWRITE clauses and INSERT INTO clauses, as well.

Dynamic Partition Inserts

There’s still one problem with this syntax: if you have a lot of partitions to create, you have to write a lot of SQL! Fortunately, Hive also supports a dynamic partition feature, where it can infer the partitions to create based on query parameters. By comparison, up until now we have considered only static partitions.

Consider this change to the previous example:

INSERT OVERWRITE TABLE employees
PARTITION (country, state)
SELECT ..., se.cnty, se.st
FROM staged_employees se;

Hive determines the values of the partition keys, country and state, from the last two columns in the SELECT clause. This is why we used different names in staged_employees, to emphasize that the relationship between the source column values and the output partition values is by position only and not by matching on names.

Suppose that staged_employees has data for a total of 100 country and state pairs. After running this query, employees will have 100 partitions!

You can also mix dynamic and static partitions. This variation of the previous query specifies a static value for the country (US) and a dynamic value for the state:

INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state)
SELECT ..., se.cnty, se.st
FROM staged_employees se
WHERE se.cnty = 'US';

The static partition keys must come before the dynamic partition keys.

Dynamic partitioning is not enabled by default. When it is enabled, it works in “strict” mode by default, where it expects at least some columns to be static. This helps protect against a badly designed query that generates a gigantic number of partitions. For example, you partition by timestamp and generate a separate partition for each second! Perhaps you meant to partition by day or maybe hour instead. Several other properties are also used to limit excess resource utilization. Table 5-1 describes these properties.

Table 5-1. Dynamic partitions properties

NameDefaultDescription

hive.exec.dynamic.partition

false

Set to true to enable dynamic partitioning.

hive.exec.dynamic.partition.mode

strict

Set to nonstrict to enable all partitions to be determined dynamically.

hive.exec.max.dynamic.partitions.pernode

100

The maximum number of dynamic partitions that can be created by each mapper or reducer. Raises a fatal error if one mapper or reducer attempts to create more than the threshold.

hive.exec.max.dynamic.partitions

+1000

The total number of dynamic partitions that can be created by one statement with dynamic partitioning. Raises a fatal error if the limit is exceeded.

hive.exec.max.created.files

100000

The maximum total number of files that can be created globally. A Hadoop counter is used to track the number of files created. Raises a fatal error if the limit is exceeded.

So, for example, our first example using dynamic partitioning for all partitions might actually look this, where we set the desired properties just before use:

hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> set hive.exec.max.dynamic.partitions.pernode=1000;

hive> INSERT OVERWRITE TABLE employees
    > PARTITION (country, state)
    > SELECT ..., se.cty, se.st
    > FROM staged_employees se;

Creating Tables and Loading Them in One Query

You can also create a table and insert query results into it in one statement:

CREATE TABLE ca_employees
AS SELECT name, salary, address
FROM employees
WHERE se.state = 'CA';

This table contains just the name, salary, and address columns from the employee table records for employees in California. The schema for the new table is taken from the SELECT clause.

A common use for this feature is to extract a convenient subset of data from a larger, more unwieldy table.

This feature can’t be used with external tables. Recall that “populating” a partition for an external table is done with an ALTER TABLE statement, where we aren’t “loading” data, per se, but pointing metadata to a location where the data can be found.

Exporting Data

How do we get data out of tables? If the data files are already formatted the way you want, then it’s simple enough to copy the directories or files:

hadoop fs -cp source_path target_path

Otherwise, you can use INSERT … DIRECTORY …, as in this example:

INSERT OVERWRITE LOCAL DIRECTORY '/tmp/ca_employees'
SELECT name, salary, address
FROM employees
WHERE se.state = 'CA';

OVERWRITE and LOCAL have the same interpretations as before and paths are interpreted following the usual rules. One or more files will be written to /tmp/ca_employees, depending on the number of reducers invoked.

The specified path can also be a full URI (e.g., hdfs://master-server/tmp/ca_employees).

Independent of how the data is actually stored in the source table, it is written to files with all fields serialized as strings. Hive uses the same encoding in the generated output files as it uses for the tables internal storage.

As a reminder, we can look at the results from within the hive CLI:

hive> ! ls /tmp/ca_employees;
000000_0
hive> ! cat /tmp/payroll/000000_0
John Doe100000.0201 San Antonio CircleMountain ViewCA94040
Mary Smith80000.01 Infinity LoopCupertinoCA95014
...

Yes, the filename is 000000_0. If there were two or more reducers writing output, we would have additional files with similar names (e.g., 000001_0).

The fields appear to be joined together without delimiters because the ^A and ^B separators aren’t rendered.

Just like inserting data to tables, you can specify multiple inserts to directories:

FROM staged_employees se
INSERT OVERWRITE DIRECTORY '/tmp/or_employees'
  SELECT * WHERE se.cty = 'US' and se.st = 'OR'
INSERT OVERWRITE DIRECTORY '/tmp/ca_employees'
  SELECT * WHERE se.cty = 'US' and se.st = 'CA'
INSERT OVERWRITE DIRECTORY '/tmp/il_employees'
  SELECT * WHERE se.cty = 'US' and se.st = 'IL';

There are some limited options for customizing the output of the data (other than writing a custom OUTPUTFORMAT, as discussed in Customizing Table Storage Formats). To format columns, the built-in functions include those for formatting strings, such as converting case, padding output, and more. See Other built-in functions for more details.

The field delimiter for the table can be problematic. For example, if it uses the default ^A delimiter. If you export table data frequently, it might be appropriate to use comma or tab delimiters.

Another workaround is to define a “temporary” table with the storage configured to match the desired output format (e.g., tab-delimited fields). Then write a query result to that table and use INSERT OVERWRITE DIRECTORY, selecting from the temporary table. Unlike many relational databases, there is no temporary table feature in Hive. You have to manually drop any tables you create that aren’t intended to be permanent.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset