Chapter 13. Functions

User-Defined Functions (UDFs) are a powerful feature that allow users to extend HiveQL. As we’ll see, you implement them in Java and once you add them to your session (interactive or driven by a script), they work just like built-in functions, even the online help. Hive has several types of user-defined functions, each of which performs a particular “class” of transformations on input data.

In an ETL workload, a process might have several processing steps. The Hive language has multiple ways to pipeline the output from one step to the next and produce multiple outputs during a single query. Users also have the ability to create their own functions for custom processing. Without this feature a process might have to include a custom MapReduce step or move the data into another system to apply the changes. Interconnecting systems add complexity and increase the chance of misconfigurations or other errors. Moving data between systems is time consuming when dealing with gigabyte- or terabyte-sized data sets. In contrast, UDFs run in the same processes as the tasks for your Hive queries, so they work efficiently and eliminate the complexity of integration with other systems. This chapter covers best practices associated with creating and using UDFs.

Discovering and Describing Functions

Before writing custom UDFs, let’s familiarize ourselves with the ones that are already part of Hive. Note that it’s common in the Hive community to use “UDF” to refer to any function, user-defined or built-in.

The SHOW FUNCTIONS command lists the functions currently loaded in the Hive session, both built-in and any user-defined functions that have been loaded using the techniques we will discuss shortly:

hive> SHOW FUNCTIONS;
abs
acos
and
array
array_contains
...

Functions usually have their own documentation. Use DESCRIBE FUNCTION to display a short description:

hive> DESCRIBE FUNCTION concat;
concat(str1, str2, ... strN) - returns the concatenation of str1, str2, ... strN

Functions may also contain extended documentation that can be accessed by adding the EXTENDED keyword:

hive> DESCRIBE FUNCTION EXTENDED concat;
concat(str1, str2, ... strN) - returns the concatenation of str1, str2, ... strN
Returns NULL if any argument is NULL.
Example:
  > SELECT concat('abc', 'def') FROM src LIMIT 1;
  'abcdef'

Calling Functions

To use a function, simply call it by name in a query, passing in any required arguments. Some functions take a specific number of arguments and argument types, while other functions accept a variable number of arguments with variable types. Just like keywords, the case of function names is ignored:

SELECT concat(column1,column2) AS x FROM table;

Standard Functions

The term user-defined function (UDF) is also used in a narrower sense to refer to any function that takes a row argument or one or more columns from a row and returns a single value. Most functions fall into this category.

Examples include many of the mathematical functions, like round() and floor(), for converting DOUBLES to BIGINTS, and abs(), for taking the absolute value of a number.

Other examples include string manipulation functions, like ucase(), which converts the string to upper case; reverse(), which reverses a string; and concat(), which joins multiple input strings into one output string.

Note that these UDFs can return a complex object, such as an array, map, or struct.

Aggregate Functions

Another type of function is an aggregate function. All aggregate functions, user-defined and built-in, are referred to generically as user-defined aggregate functions (UDAFs).

An aggregate function takes one or more columns from zero to many rows and returns a single result. Examples include the math functions: sum(), which returns a sum of all inputs; avg(), which computes the average of the values; min() and max(), which return the lowest and highest values, respectively:

hive> SELECT avg(price_close)
    > FROM stocks
    > WHERE exchange = 'NASDAQ' AND symbol = 'AAPL';

Aggregate methods are often combined with GROUP BY clauses. We saw this example in GROUP BY Clauses:

hive> SELECT year(ymd), avg(price_close) FROM stocks
    > WHERE exchange = 'NASDAQ' AND symbol = 'AAPL'
    > GROUP BY year(ymd);
1984    25.578625440597534
1985    20.193676221040867
1986    32.46102808021274
...

Table 6-3 in Chapter 6 lists the built-in aggregate functions in HiveQL.

Table Generating Functions

A third type of function supported by Hive is a table generating function. As for the other function kinds, all table generating functions, user-defined and built-in, are often referred to generically as user-defined table generating functions (UDTFs).

Table generating functions take zero or more inputs and produce multiple columns or rows of output. The array function takes a list of arguments and returns the list as a single array type. Suppose we start with this query using an array:

hive> SELECT array(1,2,3) FROM dual;
[1,2,3]

The explode() function is a UDTF that takes an array of input and iterates through the list, returning each element from the list in a separate row.

hive> SELECT explode(array(1,2,3)) AS element FROM src;
1
2
3

However, Hive only allows table generating functions to be used in limited ways. For example, we can’t project out any other columns from the table, a significant limitation. Here is a query we would like to write with the employees table we have used before. We want to list each manager-subordinate pair.

Example 13-1. Invalid use of explode

hive> SELECT name, explode(subordinates) FROM employees;
FAILED: Error in semantic analysis: UDTF's are not supported outside
the SELECT clause, nor nested in expressions

However, Hive offers a LATERAL VIEW feature to allow this kind of query:

hive> SELECT name, sub
    > FROM employees
    > LATERAL VIEW explode(subordinates) subView AS sub;
John Doe        Mary Smith
John Doe        Todd Jones
Mary Smith      Bill King

Note that there are no output rows for employees who aren’t managers (i.e., who have no subordinates), namely Bill King and Todd Jones. Hence, explode outputs zero to many new records.

The LATERAL VIEW wraps the output of the explode call. A view alias and column alias are required, subView and sub, respectively, in this case.

The list of built-in, table generating functions can be found in Table 6-4 in Chapter 6.

A UDF for Finding a Zodiac Sign from a Day

Let’s tackle writing our own UDF. Imagine we have a table with each user’s birth date stored as a column of a table. With that information, we would like to determine the user’s Zodiac sign. This process can be implemented with a standard function (UDF in the most restrictive sense). Specifically, we assume we have a discrete input either as a date formatted as a string or as a month and a day. The function must return a discrete single column of output.

Here is a sample data set, which we’ll put in a file called littlebigdata.txt in our home directory:

edward capriolo,[email protected],2-12-1981,209.191.139.200,M,10
bob,[email protected],10-10-2004,10.10.10.1,M,50
sara connor,[email protected],4-5-1974,64.64.5.1,F,2

Load this data set into a table called littlebigdata:

hive > CREATE TABLE IF NOT EXISTS littlebigdata(
     >   name   STRING,
     >   email  STRING,
     >   bday   STRING,
     >   ip     STRING,
     >   gender STRING,
     >   anum   INT)
     > ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

hive> LOAD DATA LOCAL INPATH '${env:HOME}/littlebigdata.txt'
    > INTO TABLE littlebigdata;

The input for the function will be a date and the output will be a string representing the user’s Zodiac sign.

Here is a Java implementation of the UDF we need:

package org.apache.hadoop.hive.contrib.udf.example;

import java.util.Date;
import java.text.SimpleDateFormat;
import org.apache.hadoop.hive.ql.exec.UDF;

@Description(name = "zodiac",
    value = "_FUNC_(date) - from the input date string "+
            "or separate month and day arguments, returns the sign of the Zodiac.",
    extended = "Example:
"
             + "  > SELECT _FUNC_(date_string) FROM src;
"
             + "  > SELECT _FUNC_(month, day) FROM src;")

public class UDFZodiacSign extends UDF{

  private SimpleDateFormat df;

  public UDFZodiacSign(){
    df = new SimpleDateFormat("MM-dd-yyyy");
  }

  public String evaluate( Date bday ){
    return this.evaluate( bday.getMonth(), bday.getDay() );
  }

  public String evaluate(String bday){
    Date date = null;
    try {
      date = df.parse(bday);
    } catch (Exception ex) {
      return null;
    }
    return this.evaluate( date.getMonth()+1, date.getDay() );
  }

  public String evaluate( Integer month, Integer day ){
    if (month==1) {
      if (day < 20 ){
        return "Capricorn";
      } else {
        return "Aquarius";
      }
    }
    if (month==2){
      if (day < 19 ){
        return "Aquarius";
      } else {
        return "Pisces";
      }
    }
    /* ...other months here */
    return null;
  }
}

To write a UDF, start by extending the UDF class and implements and the evaluate() function. During query processing, an instance of the class is instantiated for each usage of the function in a query. The evaluate() is called for each input row. The result of evaluate() is returned to Hive. It is legal to overload the evaluate method. Hive will pick the method that matches in a similar way to Java method overloading.

The @Description(...) is an optional Java annotation. This is how function documentation is defined and you should use these annotations to document your own UDFs. When a user invokes DESCRIBE FUNCTION ..., the _FUNC_ strings will be replaced with the function name the user picks when defining a “temporary” function, as discussed below.

Note

The arguments and return types of the UDF’s evaluate() function can only be types that Hive can serialize. For example, if you are working with whole numbers, a UDF can take as input a primitive int, an Integer wrapper object, or an IntWritable, which is the Hadoop wrapper for integers. You do not have to worry specifically about what the caller is sending because Hive will convert the types for you if they do not match. Remember that null is valid for any type in Hive, but in Java primitives are not objects and cannot be null.

To use the UDF inside Hive, compile the Java code and package the UDF bytecode class file into a JAR file. Then, in your Hive session, add the JAR to the classpath and use a CREATE FUNCTION statement to define a function that uses the Java class:

hive> ADD JAR /full/path/to/zodiac.jar;
hive> CREATE TEMPORARY FUNCTION zodiac
    > AS 'org.apache.hadoop.hive.contrib.udf.example.UDFZodiacSign';

Note that quotes are not required around the JAR file path and currently it needs to be a full path to the file on a local filesystem. Hive not only adds this JAR to the classpath, it puts the JAR file in the distributed cache so it’s available around the cluster.

Now the Zodiac UDF can be used like any other function. Notice the word TEMPORARY found inside the CREATE FUNCTION statement. Functions declared will only be available in the current session. You will have to add the JAR and create the function in each session. However, if you use the same JAR files and functions frequently, you can add these statements to your $HOME/.hiverc file:

hive> DESCRIBE FUNCTION zodiac;
zodiac(date) - from the input date string or separate month and day
arguments, returns the sign of the Zodiac.

hive> DESCRIBE FUNCTION EXTENDED zodiac;
zodiac(date) - from the input date string or separate month and day
arguments, returns the sign of the Zodiac.
Example:
  > SELECT zodiac(date_string) FROM src;
  > SELECT zodiac(month, day) FROM src;

hive> SELECT name, bday, zodiac(bday) FROM littlebigdata;
edward capriolo   2-12-1981  Aquarius
bob               10-10-2004 Libra
sara connor       4-5-1974   Aries

To recap, our UDF allows us to do custom transformations inside the Hive language. Hive can now convert the user’s birthday to the corresponding Zodiac sign while it is doing any other aggregations and transformations.

If we’re finished with the function, we can drop it:

hive> DROP TEMPORARY FUNCTION IF EXISTS zodiac;

As usual, the IF EXISTS is optional. It suppresses errors if the function doesn’t exist.

UDF Versus GenericUDF

In our Zodiac example we extended the UDF class. Hive offers a counterpart called GenericUDF. GenericUDF is a more complex abstraction, but it offers support for better null handling and makes it possible to handle some types of operations programmatically that a standard UDF cannot support. An example of a generic UDF is the Hive CASE ... WHEN statement, which has complex logic depending on the arguments to the statement. We will demonstrate how to use the GenericUDF class to write a user-defined function, called nvl(), which returns a default value if null is passed in.

The nvl() function takes two arguments. If the first argument is non-null, it is returned. If the first argument is null, the second argument is returned. The GenericUDF framework is a good fit for this problem. A standard UDF could be used as a solution but it would be cumbersome because it requires overloading the evaluate method to handle many different input types. GenericUDF will detect the type of input to the function programmatically and provide an appropriate response.

We begin with the usual laundry list of import statements:

package org.apache.hadoop.hive.ql.udf.generic;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

Next, we use the @Description annotation to document the UDF:

@Description(name = "nvl",
value = "_FUNC_(value,default_value) - Returns default value if value"
  +" is null else returns value",
extended = "Example:
"
+ "  > SELECT _FUNC_(null,'bla') FROM src LIMIT 1;
")

Now the class extends GenericUDF, a requirement to exploit the generic handling we want.

The initialize() method is called and passed an ObjectInspector for each argument. The goal of this method is to determine the return type from the arguments. The user can also throw an Exception to signal that bad types are being sent to the method. The returnOIResolver is a built-in class that determines the return type by finding the type of non-null variables and using that type:

public class GenericUDFNvl extends GenericUDF {
  private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
  private ObjectInspector[] argumentOIs;

  @Override
  public ObjectInspector initialize(ObjectInspector[] arguments)
      throws UDFArgumentException {
    argumentOIs = arguments;
    if (arguments.length != 2) {
      throw new UDFArgumentLengthException(
          "The operator 'NVL'  accepts 2 arguments.");
    }
    returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);
    if (!(returnOIResolver.update(arguments[0]) && returnOIResolver
        .update(arguments[1]))) {
      throw new UDFArgumentTypeException(2,
          "The 1st and 2nd args of function NLV should have the same type, "
          + "but they are different: "" + arguments[0].getTypeName()
          + "" and "" + arguments[1].getTypeName() + """);
    }
    return returnOIResolver.get();
  }
  ...

The evaluate method has access to the values passed to the method stored in an array of DeferredObject values. The returnOIResolver created in the initialize method is used to get values from the DeferredObjects. In this case, the function returns the first non-null value:

  ...
  @Override
  public Object evaluate(DeferredObject[] arguments) throws HiveException {
    Object retVal = returnOIResolver.convertIfNecessary(arguments[0].get(),
        argumentOIs[0]);
    if (retVal == null ){
      retVal = returnOIResolver.convertIfNecessary(arguments[1].get(),
          argumentOIs[1]);
    }
    return retVal;
  }
  ...

The final method to override is getDisplayString(), which is used inside the Hadoop tasks to display debugging information when the function is being used:

  ...
  @Override
  public String getDisplayString(String[] children) {
    StringBuilder sb = new StringBuilder();
    sb.append("if ");
    sb.append(children[0]);
    sb.append(" is null ");
    sb.append("returns");
    sb.append(children[1]);
    return sb.toString() ;
  }
}

To test the generic nature of the UDF, it is called several times, each time passing values of different types, as shown the following example:

hive> ADD JAR /path/to/jar.jar;

hive> CREATE TEMPORARY FUNCTION nvl
    > AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFNvl';

hive> SELECT nvl( 1 , 2 ) AS COL1,
    >        nvl( NULL, 5 ) AS COL2,
    >        nvl( NULL, "STUFF" ) AS COL3
    > FROM src LIMIT 1;
1       5       STUFF

Permanent Functions

Until this point we have bundled our code into JAR files, then used ADD JAR and CREATE TEMPORARY FUNCTION to make use of them.

Your function may also be added permanently to Hive, however this requires a small modification to a Hive Java file and then rebuilding Hive.

Inside the Hive source code, a one-line change is required to the FunctionRegistry class found at ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java. Then you rebuild Hive following the instructions that come with the source distribution.

While it is recommended that you redeploy the entire new build, only the hive-exec-*.jar, where * is the version number, needs to be replaced.

Here is an example change to FunctionRegistry where the new nvl() function is added to Hive’s list of built-in functions:

...
registerUDF("parse_url", UDFParseUrl.class, false);
registerGenericUDF("nvl", GenericUDFNvl.class);
registerGenericUDF("split", GenericUDFSplit.class);
...

User-Defined Aggregate Functions

Users are able to define aggregate functions, too. However, the interface is more complex to implement. Aggregate functions are processed in several phases. Depending on the transformation the UDAF performs, the types returned by each phase could be different. For example, a sum() UDAF could accept primitive integer input, create integer PARTIAL data, and produce a final integer result. However, an aggregate like median() could take primitive integer input, have an intermediate list of integers as PARTIAL data, and then produce a final integer as the result.

For an example of a generic user-defined aggregate function, see the source code for GenericUDAFAverage available at http://svn.apache.org/repos/asf/hive/branches/branch-0.8/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java.

Note

Aggregations execute inside the context of a map or reduce task, which is a Java process with memory limitations. Therefore, storing large structures inside an aggregate may exceed available heap space. The min() UDAF only requires a single element be stored in memory for comparison. The collectset() UDAF uses a set internally to de-duplicate data in order to limit memory usage. percentile_approx() uses approximations to achieve a near correct result while limiting memory usage. It is important to keep memory usage in mind when writing a UDAF. You can increase your available memory to some extent by adjusting mapred.child.java.opts, but that solution does not scale:

<property>
  <name>mapred.child.java.opts</name>
  <value>-Xmx200m</value>
</property>

Creating a COLLECT UDAF to Emulate GROUP_CONCAT

MySQL has a useful function known as GROUP_CONCAT, which combines all the elements of a group into a single string using a user-specified delimiter. Below is an example MySQL query that shows how to use its version of this function:

mysql > CREATE TABLE people (
  name STRING,
  friendname STRING );

mysql > SELECT * FROM people;
bob     sara
bob     john
bob     ted
john    sara
ted     bob
ted     sara

mysql > SELECT name, GROUP_CONCAT(friendname SEPARATOR ',')
  FROM people
  GROUP BY name;
bob     sara,john,ted
john    sara
ted     bob,sara

We can do the same transformation in Hive without the need for additional grammar in the language. First, we need an aggregate function that builds a list of all input to the aggregate. Hive already has a UDAF called collect_set that adds all input into a java.util.Set collection. Sets automatically de-duplicate entries on insertion, which is undesirable for GROUP CONCAT. To build collect, we will take the code in collect_set and replace instances of Set with instances of ArrayList. This will stop the de-duplication. The result of the aggregate will be a single array of all values.

It is important to remember that the computation of your aggregation must be arbitrarily divisible over the data. Think of it as writing a divide-and-conquer algorithm where the partitioning of the data is completely out of your control and handled by Hive. More formally, given any subset of the input rows, you should be able to compute a partial result, and also be able to merge any pair of partial results into another partial result.

The following code is available on Github. All the input to the aggregation must be primitive types. Rather than returning an ObjectInspector, like GenericUDFs, aggregates return a subclass of GenericUDAFEvaluator:

@Description(name = "collect", value = "_FUNC_(x) - Returns a list of objects. "+
"CAUTION will easily OOM on large data sets" )
public class GenericUDAFCollect extends AbstractGenericUDAFResolver {
  static final Log LOG = LogFactory.getLog(GenericUDAFCollect.class.getName());

  public GenericUDAFCollect() {
  }

  @Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
      throws SemanticException {
    if (parameters.length != 1) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Exactly one argument is expected.");
    }
    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }
    return new GenericUDAFMkListEvaluator();
  }
}

Table 13-1 describes the methods that are part of the base class.

Table 13-1. Methods in AbstractGenericUDAFResolver

MethodDescription

init

Called by Hive to initialize an instance of the UDAF evaluator class.

getNewAggregationBuffer

Return an object that will be used to store temporary aggregation results.

iterate

Process a new row of data into the aggregation buffer.

terminatePartial

Return the contents of the current aggregation in a persistable way. Here, persistable means the return value can only be built up in terms of Java primitives, arrays, primitive wrappers (e.g., Double), Hadoop Writables, Lists, and Maps. Do NOT use your own classes (even if they implement java.io.Serializable).

merge

Merge a partial aggregation returned by terminatePartial into the current aggregation.

terminate

Return the final result of the aggregation to Hive.

In the init method, the object inspectors for the result type are set, after determining what mode the evaluator is in.

The iterate() and terminatePartial() methods are used on the map side, while terminate() and merge() are used on the reduce side to produce the final result. In all cases the merges are building larger lists:

public static class GenericUDAFMkListEvaluator extends GenericUDAFEvaluator {
  private PrimitiveObjectInspector inputOI;
  private StandardListObjectInspector loi;
  private StandardListObjectInspector internalMergeOI;

  @Override
  public ObjectInspector init(Mode m, ObjectInspector[] parameters)
        throws HiveException {
    super.init(m, parameters);
    if (m == Mode.PARTIAL1) {
      inputOI = (PrimitiveObjectInspector) parameters[0];
      return ObjectInspectorFactory
        .getStandardListObjectInspector(
           (PrimitiveObjectInspector) ObjectInspectorUtils
        .getStandardObjectInspector(inputOI));
    } else {
      if (!(parameters[0] instanceof StandardListObjectInspector)) {
        inputOI = (PrimitiveObjectInspector)  ObjectInspectorUtils
          .getStandardObjectInspector(parameters[0]);
        return (StandardListObjectInspector) ObjectInspectorFactory
          .getStandardListObjectInspector(inputOI);
      } else {
        internalMergeOI = (StandardListObjectInspector) parameters[0];
        inputOI = (PrimitiveObjectInspector)
          internalMergeOI.getListElementObjectInspector();
        loi = (StandardListObjectInspector) ObjectInspectorUtils
                .getStandardObjectInspector(internalMergeOI);
        return loi;
      }
    }
  }
  ...

The remaining methods and class definition define MkArrayAggregationBuffer as well as top-level methods that modify the contents of the buffer:

Note

You may have noticed that Hive tends to avoid allocating objects with new whenever possible. Hadoop and Hive use this pattern to create fewer temporary objects and thus less work for the JVM’s Garbage Collection algorithms. Keep this in mind when writing UDFs, because references are typically reused. Assuming immutable objects will lead to bugs!

  ...
  static class MkArrayAggregationBuffer implements AggregationBuffer {
    List<Object> container;
  }

  @Override
  public void reset(AggregationBuffer agg) throws HiveException {
    ((MkArrayAggregationBuffer) agg).container =
       new ArrayList<Object>();
  }

  @Override
  public AggregationBuffer getNewAggregationBuffer()
      throws HiveException {
    MkArrayAggregationBuffer ret = new MkArrayAggregationBuffer();
    reset(ret);
    return ret;
  }

  // Mapside
  @Override
  public void iterate(AggregationBuffer agg, Object[] parameters)
      throws HiveException {
    assert (parameters.length == 1);
    Object p = parameters[0];

    if (p != null) {
      MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
      putIntoList(p, myagg);
    }
  }

  // Mapside
  @Override
  public Object terminatePartial(AggregationBuffer agg)
      throws HiveException {
    MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
    ArrayList<Object> ret = new ArrayList<Object>(myagg.container.size());
    ret.addAll(myagg.container);
    return ret;
  }

  @Override
  public void merge(AggregationBuffer agg, Object partial)
      throws HiveException {
    MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
    ArrayList<Object> partialResult =
      (ArrayList<Object>) internalMergeOI.getList(partial);
    for(Object i : partialResult) {
      putIntoList(i, myagg);
    }
  }

  @Override
  public Object terminate(AggregationBuffer agg) throws HiveException {
    MkArrayAggregationBuffer myagg = (MkArrayAggregationBuffer) agg;
    ArrayList<Object> ret = new ArrayList<Object>(myagg.container.size());
    ret.addAll(myagg.container);
    return ret;
  }

  private void putIntoList(Object p, MkArrayAggregationBuffer myagg) {
    Object pCopy =
      ObjectInspectorUtils.copyToStandardObject(p,this.inputOI);
    myagg.container.add(pCopy);
  }
}

Using collect will return a single row with a single array of all of the aggregated values:

hive> dfs -cat $HOME/afile.txt;
twelve  12
twelve  1
eleven  11
eleven  10

hive> CREATE TABLE collecttest (str STRING, countVal INT)
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY '09' LINES TERMINATED BY '10';

hive> LOAD DATA LOCAL INPATH '${env:HOME}/afile.txt' INTO TABLE collecttest;

hive> SELECT collect(str) FROM collecttest;
[twelve,twelve,eleven,eleven]

The concat_ws() takes a delimiter as its first argument. The remaining arguments can be string types or arrays of strings. The returned result contains the argument joined together by the delimiter. Hence, we have converted the array into a single comma-separated string:

hive> SELECT concat_ws( ',' , collect(str)) FROM collecttest;
twelve,twleve,eleven,eleven

GROUP_CONCAT can be done by combining GROUP BY, COLLECT and concat_ws() as shown here:

hive> SELECT str, concat_ws( ',' , collect(cast(countVal AS STRING)))
    > FROM collecttest GROUP BY str;
eleven  11,10
twelve  12,1

User-Defined Table Generating Functions

While UDFs can be used be return arrays or structures, they cannot return multiple columns or multiple rows. User-Defined Table Generating Functions, or UDTFs, address this need by providing a programmatic interface to return multiple columns and even multiple rows.

UDTFs that Produce Multiple Rows

We have already used the explode method in several examples. Explode takes an array as input and outputs one row for each element in the array. An alternative way to do this would have the UDTF generate the rows based on some input. We will demonstrate this with a UDTF that works like a for loop. The function receives user inputs of the start and stop values and then outputs N rows:

hive> SELECT forx(1,5) AS i FROM collecttest;
1
2
3
4
5

Our class extends the GenericUDTF interface. We declare three integer variables for the start, end, and increment. The forwardObj array will be used to return result rows:

package com.jointhegrid.udf.collect;

import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.*;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
import org.apache.hadoop.io.IntWritable;

public class GenericUDTFFor extends GenericUDTF {

  IntWritable start;
  IntWritable end;
  IntWritable inc;

  Object[] forwardObj = null;
  ...

Because the arguments to this function are constant, the value can be determined in the initialize method. Nonconstant values are typically not available until the evaluate method. The third argument for increment is optional, as it defaults to 1:

  ...
  @Override
  public StructObjectInspector initialize(ObjectInspector[] args)
          throws UDFArgumentException {
    start=((WritableConstantIntObjectInspector) args[0])
        .getWritableConstantValue();
    end=((WritableConstantIntObjectInspector) args[1])
        .getWritableConstantValue();
    if (args.length == 3) {
      inc =((WritableConstantIntObjectInspector) args[2])
        .getWritableConstantValue();
    } else {
      inc = new IntWritable(1);
    }
        ...

This function returns only a single column and its type is always an integer. We need to give it a name, but the user can always override this later:

    ...
    this.forwardObj = new Object[1];
    ArrayList<String> fieldNames = new ArrayList<String>();
    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

    fieldNames.add("col0");
    fieldOIs.add(
      PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
        PrimitiveCategory.INT));

    return ObjectInspectorFactory.getStandardStructObjectInspector(
        fieldNames, fieldOIs);

  }
  ...

The process method is where the interesting work happens. Notice that the return type is void. This is because UDTF can forward zero or more rows, unlike a UDF, which has a single return. In this case the call to the forward method is nested inside a for loop, which causes it to forward a row for each iteration:

  ...
  @Override
  public void process(Object[] args)
          throws HiveException, UDFArgumentException {
    for (int i = start.get(); i < end.get(); i = i + inc.get()) {
      this.forwardObj[0] = new Integer(i);
      forward(forwardObj);
    }
  }

  @Override
  public void close() throws HiveException {
  }
}

UDTFs that Produce a Single Row with Multiple Columns

An example of a UDTF that returns multiple columns but only one row is the parse_url_tuple function, which is a built-in Hive function. It takes as input a parameter that is a URL and one or more constants that specify the parts of the URL the user wants returned:

hive> SELECT parse_url_tuple(weblogs.url, 'HOST', 'PATH')
    > AS (host, path) FROM weblogs;
google.com      /index.html
hotmail.com     /a/links.html

The benefit of this type of UDFT is the URL only needs to be parsed once, then returns multiple columns—a clear performance win. The alternative, using UDFs, involves writing several UDFs to extract specific parts of the URL. Using UDFs requires writing more code as well as more processing time because the URL is parsed multiple times. For example, something like the following:

SELECT PARSE_HOST(a.url) as host, PARSE_PORT(url) FROM weblogs;

UDTFs that Simulate Complex Types

A UDTF can be used as a technique for adding more complex types to Hive. For example, a complex type can be serialized as an encoded string and a UDTF will deserialize the complex type when needed. Suppose we have a Java class named Book. Hive cannot work with this datatype directly, however a Book could be encoded to and decoded from a string format:

public class Book {
  public Book () { }
  public String isbn;
  public String title;
  public String [] authors;

  /* note: this system will not work if your table is
     using '|' or ',' as the field delimiter! */
  public void fromString(String parts){
    String [] part = part.split("|");
    isbn = Integer.parseInt( part[0] );
    title = part[1] ;
    authors = part[2].split(",");
  }

  public String toString(){
    return isbn+"	"+title+"	"+StringUtils.join(authors, ",");
  }
}

Imagine we have a flat text file with books in this format. For now lets assume we could not use a delimited SerDe to split on | and ,:

hive> SELECT * FROM books;
5555555|Programming Hive|Edward,Dean,Jason

In the pipe-delimited raw form it is possible to do some parsing of the data:

hive> SELECT cast(split(book_info,"|")[0] AS INTEGER) AS isbn FROM books
    > WHERE split(book_info,"|")[1] = "Programming Hive";
5555555

This HiveQL works correctly, however it could be made easier for the end user. For example, writing this type of query may require consulting documentation regarding which fields and types are used, remembering casting conversion rules, and so forth. By contrast, a UDTF makes this HiveQL simpler and more readable. In the following example, the parse_book() UDTF is introduced:

hive> FROM (
    >  parse_book(book_info) AS (isbn, title, authors) FROM Book ) a
    > SELECT a.isbn
    > WHERE a.title="Programming Hive"
    > AND array_contains (authors, 'Edward'),
5555555

The function parse_book() allows Hive to return multiple columns of different types representing the fields of a book:

package com.jointhegrid.udf.collect;

import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector
  .PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive
  .PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive
  .WritableConstantStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive
  .WritableStringObjectInspector;
import org.apache.hadoop.io.Text;


public class UDTFBook extends GenericUDTF{

  private Text sent;
  Object[] forwardObj = null;
  ...

The function will return three properties and ISBN as an integer, a title as a string, and authors as an array of strings. Notice that we can return nested types with all UDFs, for example we can return an array of array of strings:

  ...
  @Override
  public StructObjectInspector initialize(ObjectInspector[] args)
       throws UDFArgumentException {

    ArrayList<String> fieldNames = new ArrayList<String>();
    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

    fieldNames.add("isbn");
    fieldOIs.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
        PrimitiveCategory.INT));

    fieldNames.add("title");
    fieldOIs.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
        PrimitiveCategory.STRING));

    fieldNames.add("authors");
    fieldOIs.add( ObjectInspectorFactory.getStandardListObjectInspector(
        PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
            PrimitiveCategory.STRING)
        )
    );

    forwardObj= new Object[3];
    return ObjectInspectorFactory.getStandardStructObjectInspector(
            fieldNames, fieldOIs);

  }
  ...

The process method only returns a single row. However, each element in the object array will be bound to a specific variable:

  ...
  @Override
  public void process(Object[] os) throws HiveException {
    sent = new Text(((StringObjectInspector)args[0])
        .getPrimitiveJavaObject(os[0]));
    String parts = new String(this.sent.getBytes());
    String [] part = parts.split("\|");
    forwardObj[0]=Integer.parseInt( part[0] );
    forwardObj[1]=part[1] ;
    forwardObj[2]=part[2].split(",");
    this.forward(forwardObj);
  }

  @Override
  public void close() throws HiveException {
  }
}

We have followed the call to the book UDTF with AS, which allows the result columns to be named by the user. They can then be used in other parts of the query without having to parse information from the book again:

client.execute(
  "create temporary function book as 'com.jointhegrid.udf.collect.UDTFBook'");
client.execute("create table  booktest (str string) ");
client.execute(
  "load data local inpath '" + p.toString() + "' into table booktest");
client.execute("select book(str) AS (book, title, authors) from booktest");
[555 Programming Hive "Dean","Jason","Edward"]

Accessing the Distributed Cache from a UDF

UDFs may access files inside the distributed cache, the local filesystem, or even the distributed filesystem. This access should be used cautiously as the overhead is significant.

A common usage of Hive is the analyzing of web logs. A popular operation is determining the geolocation of web traffic based on the IP address. Maxmind makes a GeoIP database available and a Java API to search this database. By wrapping a UDF around this API, location information may be looked up about an IP address from within a Hive query.

The GeoIP API uses a small data file. This is ideal for showing the functionality of accessing a distributed cache file from a UDF. The complete code for this example is found at https://github.com/edwardcapriolo/hive-geoip/.

ADD FILE is used to cache the necessary data files with Hive. ADD JAR is used to add the required Java JAR files to the cache and the classpath. Finally, the temporary function must be defined as the final step before performing queries:

hive> ADD FILE GeoIP.dat;
hive> ADD JAR geo-ip-java.jar;
hive> ADD JAR hive-udf-geo-ip-jtg.jar;
hive> CREATE TEMPORARY FUNCTION geoip
    > AS 'com.jointhegrid.hive.udf.GenericUDFGeoIP';

hive> SELECT ip, geoip(source_ip, 'COUNTRY_NAME', './GeoIP.dat') FROM weblogs;
209.191.139.200         United States
10.10.0.1       Unknown

The two examples returned include an IP address in the United States and a private IP address that has no fixed address.

The geoip() function takes three arguments: the IP address in either string or long format, a string that must match one of the constants COUNTRY_NAME or DMA_CODE, and a final argument that is the name of the data file that has already been placed in the distributed cache.

The first call to the UDF (which triggers the first call to the evaluate Java function in the implementation) will instantiate a LookupService object that uses the file located in the distributed cache. The lookup service is saved in a reference so it only needs to be initialized once in the lifetime of a map or reduce task that initializes it. Note that the LookupService has its own internal caching, LookupService.GEOIP\_MEMORY_CACHE, so that optimization should avoid frequent disk access when looking up IPs.

Here is the source code for evaluate():

  @Override
  public Object evaluate(DeferredObject[] arguments) throws HiveException {
    if (argumentOIs[0] instanceof LongObjectInspector) {
      this.ipLong = ((LongObjectInspector)argumentOIs[0]).get(arguments[0].get());
    } else {
      this.ipString = ((StringObjectInspector)argumentOIs[0])
        .getPrimitiveJavaObject(arguments[0].get());
    }
    this.property = ((StringObjectInspector)argumentOIs[1])
        .getPrimitiveJavaObject(arguments[1].get());
    if (this.property != null) {
      this.property = this.property.toUpperCase();
    }
    if (ls ==null){
      if (argumentOIs.length == 3){
        this.database = ((StringObjectInspector)argumentOIs[1])
                .getPrimitiveJavaObject(arguments[2].get());
        File f = new File(database);
        if (!f.exists())
          throw new HiveException(database+" does not exist");
        try {
          ls = new LookupService ( f , LookupService.GEOIP_MEMORY_CACHE );
        } catch (IOException ex){
          throw new HiveException (ex);
        }
      }
    }
    ...

An if statement in evaluate determines which data the method should return. In our example, the country name is requested:

    ...
    if (COUNTRY_PROPERTIES.contains(this.property)) {
      Country country = ipString != null ?
          ls.getCountry(ipString) : ls.getCountry(ipLong);
      if (country == null) {
        return null;
      } else if (this.property.equals(COUNTRY_NAME)) {
        return country.getName();
      } else if (this.property.equals(COUNTRY_CODE)) {
        return country.getCode();
      }
      assert(false);
    } else if (LOCATION_PROPERTIES.contains(this.property)) {
      ...
    }
  }

Annotations for Use with Functions

In this chapter we mentioned the Description annotation and how it is used to provide documentation for Hive methods at runtime. Other annotations exist for UDFs that can make functions easier to use and even increase the performance of some Hive queries:

public @interface UDFType {
  boolean deterministic() default true;
  boolean stateful() default false;
  boolean distinctLike() default false;
}

Deterministic

By default, deterministic is automatically turned on for most queries because they are inherently deterministic by nature. An exception is the function rand().

If a UDF is not deterministic, it is not included in the partition pruner.

An example of a nondeterministic query using rand() is the following:

SELECT * FROM t WHERE rand() < 0.01;

If rand() were deterministic, the result would only be calculated a single time in the computation state. Because a query with rand() is nondeterministic, the result of rand() is recomputed for each row.

Stateful

Almost all the UDFs are stateful by default; a UDF that is not stateful is rand() because it returns a different value for each invocation. The Stateful annotation may be used under the following conditions:

  • A stateful UDF can only be used in the SELECT list, not in other clauses such as WHERE/ON/ORDER/GROUP.

  • When a stateful UDF is present in a query, the implication is the SELECT will be treated similarly to TRANSFORM (i.e., a DISTRIBUTE/CLUSTER/SORT clause), then run inside the corresponding reducer to ensure the results are as expected.

  • If stateful is set to true, the UDF should also be treated as nondeterministic (even if the deterministic annotation explicitly returns true).

See https://issues.apache.org/jira/browse/HIVE-1994 for more details.

DistinctLike

Used for cases where the function behaves like DISTINCT even when applied to a nondistinct column of values. Examples include min and max functions that return a distinct value even though the underlying numeric data can have repeating values.

Macros

Macros provide the ability to define functions in HiveQL that call other functions and operators. When appropriate for the particular situation, macros are a convenient alternative to writing UDFs in Java or using Hive streaming, because they require no external code or scripts.

To define a macro, use the CREATE TEMPORARY MACRO syntax. Here is an example that creates a SIGMOID function calculator:

hive> CREATE TEMPORARY MACRO SIGMOID (x DOUBLE) 1.0 / (1.0 + EXP(-x));
hive> SELECT SIGMOID(2) FROM src LIMIT 1;
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset