User-Defined Functions (UDFs) are a powerful feature that allow users to extend HiveQL. As we’ll see, you implement them in Java and once you add them to your session (interactive or driven by a script), they work just like built-in functions, even the online help. Hive has several types of user-defined functions, each of which performs a particular “class” of transformations on input data.
In an ETL workload, a process might have several processing steps. The Hive language has multiple ways to pipeline the output from one step to the next and produce multiple outputs during a single query. Users also have the ability to create their own functions for custom processing. Without this feature a process might have to include a custom MapReduce step or move the data into another system to apply the changes. Interconnecting systems add complexity and increase the chance of misconfigurations or other errors. Moving data between systems is time consuming when dealing with gigabyte- or terabyte-sized data sets. In contrast, UDFs run in the same processes as the tasks for your Hive queries, so they work efficiently and eliminate the complexity of integration with other systems. This chapter covers best practices associated with creating and using UDFs.
Before writing custom UDFs, let’s familiarize ourselves with the ones that are already part of Hive. Note that it’s common in the Hive community to use “UDF” to refer to any function, user-defined or built-in.
The SHOW FUNCTIONS
command lists
the functions currently loaded in the Hive session, both built-in and any
user-defined functions that have been loaded using the techniques we will
discuss shortly:
hive
>
SHOW
FUNCTIONS
;
abs
acos
and
array
array_contains
...
Functions usually have their own documentation. Use DESCRIBE FUNCTION
to display a short
description:
hive
>
DESCRIBE
FUNCTION
concat
;
concat
(
str1
,
str2
,
...
strN
)
-
returns
the
concatenation
of
str1
,
str2
,
...
strN
Functions may also contain extended documentation that can be
accessed by adding the EXTENDED
keyword:
hive
>
DESCRIBE
FUNCTION
EXTENDED
concat
;
concat
(
str1
,
str2
,
...
strN
)
-
returns
the
concatenation
of
str1
,
str2
,
...
strN
Returns
NULL
if
any
argument
is
NULL
.
Example
:
>
SELECT
concat
(
'abc'
,
'def'
)
FROM
src
LIMIT
1
;
'abcdef'
To use a function, simply call it by name in a query, passing in any required arguments. Some functions take a specific number of arguments and argument types, while other functions accept a variable number of arguments with variable types. Just like keywords, the case of function names is ignored:
SELECT
concat
(
column1
,
column2
)
AS
x
FROM
table
;
The term user-defined function (UDF) is also used in a narrower sense to refer to any function that takes a row argument or one or more columns from a row and returns a single value. Most functions fall into this category.
Examples include many of the mathematical functions, like round()
and floor()
, for converting DOUBLES
to BIGINTS
, and abs()
, for taking the absolute value of a
number.
Other examples include string manipulation functions, like ucase()
, which converts the string to upper
case; reverse()
, which reverses a
string; and concat()
, which joins
multiple input strings into one output string.
Note that these UDFs can return a complex object, such as an array, map, or struct.
Another type of function is an aggregate function. All aggregate functions, user-defined and built-in, are referred to generically as user-defined aggregate functions (UDAFs).
An aggregate function takes one or more columns from zero to many
rows and returns a single result. Examples include the math functions:
sum()
, which returns a sum of all
inputs; avg()
, which computes the
average of the values; min()
and
max()
, which return the lowest and
highest values, respectively:
hive
>
SELECT
avg
(
price_close
)
>
FROM
stocks
>
WHERE
exchange
=
'NASDAQ'
AND
symbol
=
'AAPL'
;
Aggregate methods are often combined with GROUP BY
clauses. We saw this example in GROUP BY Clauses:
hive
>
SELECT
year
(
ymd
),
avg
(
price_close
)
FROM
stocks
>
WHERE
exchange
=
'NASDAQ'
AND
symbol
=
'AAPL'
>
GROUP
BY
year
(
ymd
);
1984
25
.
578625440597534
1985
20
.
193676221040867
1986
32
.
46102808021274
...
Table 6-3 in Chapter 6 lists the built-in aggregate functions in HiveQL.
A third type of function supported by Hive is a table generating function. As for the other function kinds, all table generating functions, user-defined and built-in, are often referred to generically as user-defined table generating functions (UDTFs).
Table generating functions take zero or more inputs and produce
multiple columns or rows of output. The array
function
takes a list of arguments and returns the list as a single
array
type. Suppose we start with this query using an
array
:
hive
>
SELECT
array
(
1
,
2
,
3
)
FROM
dual
;
[
1
,
2
,
3
]
The explode()
function is a UDTF
that takes an array
of input and iterates through the
list, returning each element from the list in a separate row.
hive
>
SELECT
explode
(
array
(
1
,
2
,
3
))
AS
element
FROM
src
;
1
2
3
However, Hive only allows table generating functions to be used in
limited ways. For example, we can’t project out any other columns from the
table, a significant limitation. Here is a query we would like to write
with the employees
table we have used
before. We want to list each manager-subordinate pair.
Example 13-1. Invalid use of explode
hive
>
SELECT
name
,
explode
(
subordinates
)
FROM
employees
;
FAILED
:
Error
in
semantic
analysis
:
UDTF
'
s
are
not
supported
outside
the
SELECT
clause
,
nor
nested
in
expressions
However, Hive offers a LATERAL
VIEW
feature to allow this kind of query:
hive
>
SELECT
name
,
sub
>
FROM
employees
>
LATERAL
VIEW
explode
(
subordinates
)
subView
AS
sub
;
John
Doe
Mary
Smith
John
Doe
Todd
Jones
Mary
Smith
Bill
King
Note that there are no output rows for employees who aren’t managers
(i.e., who have no subordinates), namely Bill
King
and Todd Jones
. Hence,
explode
outputs zero to many new
records.
The LATERAL VIEW
wraps the output
of the explode
call. A view alias and
column alias are required, subView
and
sub
, respectively, in this case.
The list of built-in, table generating functions can be found in Table 6-4 in Chapter 6.
Let’s tackle writing our own UDF. Imagine we have a table with each user’s birth date stored as a column of a table. With that information, we would like to determine the user’s Zodiac sign. This process can be implemented with a standard function (UDF in the most restrictive sense). Specifically, we assume we have a discrete input either as a date formatted as a string or as a month and a day. The function must return a discrete single column of output.
Here is a sample data set, which we’ll put in a file called littlebigdata.txt in our home directory:
edward capriolo,[email protected],2-12-1981,209.191.139.200,M,10 bob,[email protected],10-10-2004,10.10.10.1,M,50 sara connor,[email protected],4-5-1974,64.64.5.1,F,2
Load this data set into a table called littlebigdata
:
hive
>
CREATE
TABLE
IF
NOT
EXISTS
littlebigdata
(
>
name
STRING
,
>
STRING
,
>
bday
STRING
,
>
ip
STRING
,
>
gender
STRING
,
>
anum
INT
)
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
','
;
hive
>
LOAD
DATA
LOCAL
INPATH
'${env:HOME}/littlebigdata.txt'
>
INTO
TABLE
littlebigdata
;
The input for the function will be a date and the output will be a string representing the user’s Zodiac sign.
Here is a Java implementation of the UDF we need:
package
org
.
apache
.
hadoop
.
hive
.
contrib
.
udf
.
example
;
import
java.util.Date
;
import
java.text.SimpleDateFormat
;
import
org.apache.hadoop.hive.ql.exec.UDF
;
@Description
(
name
=
"zodiac"
,
value
=
"_FUNC_(date) - from the input date string "
+
"or separate month and day arguments, returns the sign of the Zodiac."
,
extended
=
"Example: "
+
" > SELECT _FUNC_(date_string) FROM src; "
+
" > SELECT _FUNC_(month, day) FROM src;"
)
public
class
UDFZodiacSign
extends
UDF
{
private
SimpleDateFormat
df
;
public
UDFZodiacSign
(){
df
=
new
SimpleDateFormat
(
"MM-dd-yyyy"
);
}
public
String
evaluate
(
Date
bday
){
return
this
.
evaluate
(
bday
.
getMonth
(),
bday
.
getDay
()
);
}
public
String
evaluate
(
String
bday
){
Date
date
=
null
;
try
{
date
=
df
.
parse
(
bday
);
}
catch
(
Exception
ex
)
{
return
null
;
}
return
this
.
evaluate
(
date
.
getMonth
()+
1
,
date
.
getDay
()
);
}
public
String
evaluate
(
Integer
month
,
Integer
day
){
if
(
month
==
1
)
{
if
(
day
<
20
){
return
"Capricorn"
;
}
else
{
return
"Aquarius"
;
}
}
if
(
month
==
2
){
if
(
day
<
19
){
return
"Aquarius"
;
}
else
{
return
"Pisces"
;
}
}
/* ...other months here */
return
null
;
}
}
To write a UDF, start by extending the UDF
class and implements and the evaluate()
function. During query processing, an
instance of the class is instantiated for each usage of the function in a
query. The evaluate()
is called for
each input row. The result of evaluate()
is returned to Hive. It is legal to
overload the evaluate method. Hive will pick the method that matches in a
similar way to Java method overloading.
The @Description(...)
is an
optional Java annotation. This is how function
documentation is defined and you should use these annotations to document
your own UDFs. When a user invokes DESCRIBE
FUNCTION ...
, the _FUNC_
strings will be replaced with the function name the user picks when
defining a “temporary” function, as discussed below.
The arguments and return types of the UDF’s evaluate()
function can only be types that
Hive can serialize. For example, if you are working with whole numbers,
a UDF can take as input a primitive int
, an Integer
wrapper object, or an IntWritable
, which is the Hadoop wrapper for
integers. You do not have to worry specifically about what the caller is
sending because Hive will convert the types for you if they do not
match. Remember that null
is valid
for any type in Hive, but in Java primitives are not objects and cannot
be null
.
To use the UDF inside Hive, compile the Java code and package the
UDF bytecode class file into a JAR file. Then, in your Hive session, add
the JAR to the classpath and use a CREATE
FUNCTION
statement to define a function that uses the Java
class:
hive
>
ADD
JAR
/
full
/
path
/
to
/
zodiac
.
jar
;
hive
>
CREATE
TEMPORARY
FUNCTION
zodiac
>
AS
'org.apache.hadoop.hive.contrib.udf.example.UDFZodiacSign'
;
Note that quotes are not required around the JAR file path and currently it needs to be a full path to the file on a local filesystem. Hive not only adds this JAR to the classpath, it puts the JAR file in the distributed cache so it’s available around the cluster.
Now the Zodiac UDF can be used like any other function. Notice the
word TEMPORARY
found inside the
CREATE FUNCTION
statement. Functions
declared will only be available in the current session. You will have to
add the JAR and create the function in each session. However, if you use
the same JAR files and functions frequently, you can add these statements
to your $HOME/.hiverc file:
hive
>
DESCRIBE
FUNCTION
zodiac
;
zodiac
(
date
)
-
from
the
input
date
string
or
separate
month
and
day
arguments
,
returns
the
sign
of
the
Zodiac
.
hive
>
DESCRIBE
FUNCTION
EXTENDED
zodiac
;
zodiac
(
date
)
-
from
the
input
date
string
or
separate
month
and
day
arguments
,
returns
the
sign
of
the
Zodiac
.
Example
:
>
SELECT
zodiac
(
date_string
)
FROM
src
;
>
SELECT
zodiac
(
month
,
day
)
FROM
src
;
hive
>
SELECT
name
,
bday
,
zodiac
(
bday
)
FROM
littlebigdata
;
edward
capriolo
2
-
12
-
1981
Aquarius
bob
10
-
10
-
2004
Libra
sara
connor
4
-
5
-
1974
Aries
To recap, our UDF allows us to do custom transformations inside the Hive language. Hive can now convert the user’s birthday to the corresponding Zodiac sign while it is doing any other aggregations and transformations.
If we’re finished with the function, we can drop it:
hive
>
DROP
TEMPORARY
FUNCTION
IF
EXISTS
zodiac
;
As usual, the IF EXISTS
is
optional. It suppresses errors if the function doesn’t exist.
In our Zodiac example we extended the UDF
class. Hive offers a counterpart called
GenericUDF
. GenericUDF
is a more complex abstraction, but it
offers support for better null handling and makes it possible to handle
some types of operations programmatically that a standard UDF
cannot support. An example of a generic UDF
is the Hive CASE ... WHEN
statement,
which has complex logic depending on the arguments to the statement. We
will demonstrate how to use the GenericUDF
class to write a user-defined
function, called nvl()
, which returns a
default value if null is passed in.
The nvl()
function takes two
arguments. If the first argument is non-null, it is returned. If the first
argument is null, the second argument is returned. The GenericUDF
framework is a good fit for this
problem. A standard UDF could be used as a solution but it would be
cumbersome because it requires overloading the evaluate method to handle
many different input types. GenericUDF
will detect the type of input to the function programmatically and provide
an appropriate response.
We begin with the usual laundry list of import statements:
package
org
.
apache
.
hadoop
.
hive
.
ql
.
udf
.
generic
;
import
org.apache.hadoop.hive.ql.exec.Description
;
import
org.apache.hadoop.hive.ql.exec.UDFArgumentException
;
import
org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException
;
import
org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException
;
import
org.apache.hadoop.hive.ql.metadata.HiveException
;
import
org.apache.hadoop.hive.ql.udf.generic.GenericUDF
;
import
org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils
;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
;
Next, we use the @Description
annotation to document the UDF:
@Description
(
name
=
"nvl"
,
value
=
"_FUNC_(value,default_value) - Returns default value if value"
+
" is null else returns value"
,
extended
=
"Example: "
+
" > SELECT _FUNC_(null,'bla') FROM src LIMIT 1; "
)
Now the class extends GenericUDF
,
a requirement to exploit the generic handling we want.
The initialize()
method is called
and passed an ObjectInspector
for each
argument. The goal of this method is to determine the return type from the
arguments. The user can also throw an Exception
to signal that bad types are being
sent to the method. The returnOIResolver
is a built-in class that
determines the return type by finding the type of non-null variables and
using that type:
public
class
GenericUDFNvl
extends
GenericUDF
{
private
GenericUDFUtils
.
ReturnObjectInspectorResolver
returnOIResolver
;
private
ObjectInspector
[]
argumentOIs
;
@Override
public
ObjectInspector
initialize
(
ObjectInspector
[]
arguments
)
throws
UDFArgumentException
{
argumentOIs
=
arguments
;
if
(
arguments
.
length
!=
2
)
{
throw
new
UDFArgumentLengthException
(
"The operator 'NVL' accepts 2 arguments."
);
}
returnOIResolver
=
new
GenericUDFUtils
.
ReturnObjectInspectorResolver
(
true
);
if
(!(
returnOIResolver
.
update
(
arguments
[
0
])
&&
returnOIResolver
.
update
(
arguments
[
1
])))
{
throw
new
UDFArgumentTypeException
(
2
,
"The 1st and 2nd args of function NLV should have the same type, "
+
"but they are different: ""
+
arguments
[
0
].
getTypeName
()
+
"" and ""
+
arguments
[
1
].
getTypeName
()
+
"""
);
}
return
returnOIResolver
.
get
();
}
...
The evaluate method has access to the values passed to the method
stored in an array of DeferredObject
values. The returnOIResolver
created in
the initialize
method is used to get
values from the DeferredObjects
. In
this case, the function returns the first non-null value:
...
@Override
public
Object
evaluate
(
DeferredObject
[]
arguments
)
throws
HiveException
{
Object
retVal
=
returnOIResolver
.
convertIfNecessary
(
arguments
[
0
].
get
(),
argumentOIs
[
0
]);
if
(
retVal
==
null
){
retVal
=
returnOIResolver
.
convertIfNecessary
(
arguments
[
1
].
get
(),
argumentOIs
[
1
]);
}
return
retVal
;
}
...
The final method to override is getDisplayString()
, which is used inside the
Hadoop tasks to display debugging information when the function is being
used:
...
@Override
public
String
getDisplayString
(
String
[]
children
)
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
"if "
);
sb
.
append
(
children
[
0
]);
sb
.
append
(
" is null "
);
sb
.
append
(
"returns"
);
sb
.
append
(
children
[
1
]);
return
sb
.
toString
()
;
}
}
To test the generic nature of the UDF, it is called several times, each time passing values of different types, as shown the following example:
hive
>
ADD
JAR
/
path
/
to
/
jar
.
jar
;
hive
>
CREATE
TEMPORARY
FUNCTION
nvl
>
AS
'org.apache.hadoop.hive.ql.udf.generic.GenericUDFNvl'
;
hive
>
SELECT
nvl
(
1
,
2
)
AS
COL1
,
>
nvl
(
NULL
,
5
)
AS
COL2
,
>
nvl
(
NULL
,
"STUFF"
)
AS
COL3
>
FROM
src
LIMIT
1
;
1
5
STUFF
Until this point we have bundled our code into JAR files,
then used ADD JAR
and CREATE TEMPORARY FUNCTION
to make use of
them.
Your function may also be added permanently to Hive, however this requires a small modification to a Hive Java file and then rebuilding Hive.
Inside the Hive source code, a one-line change is required to the
FunctionRegistry
class found at
ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java.
Then you rebuild Hive following the instructions that come with the source
distribution.
While it is recommended that you redeploy the entire new build, only the hive-exec-*.jar, where * is the version number, needs to be replaced.
Here is an example change to FunctionRegistry
where the new nvl()
function is added to Hive’s list of
built-in functions:
...
registerUDF
(
"parse_url"
,
UDFParseUrl
.
class
,
false
);
registerGenericUDF
(
"nvl"
,
GenericUDFNvl
.
class
);
registerGenericUDF
(
"split"
,
GenericUDFSplit
.
class
);
...
Users are able to define aggregate functions, too. However,
the interface is more complex to implement. Aggregate functions are
processed in several phases. Depending on the transformation the UDAF
performs, the types returned by each phase could be different. For
example, a sum()
UDAF could accept
primitive integer input, create integer PARTIAL
data, and produce a final integer
result. However, an aggregate like median()
could take primitive integer input,
have an intermediate list of integers as PARTIAL
data, and then produce a final integer
as the result.
For an example of a generic user-defined
aggregate function, see the source code for GenericUDAFAverage
available at http://svn.apache.org/repos/asf/hive/branches/branch-0.8/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java.
Aggregations execute inside the context of a map or reduce task,
which is a Java process with memory limitations. Therefore, storing
large structures inside an aggregate may exceed available heap space.
The min()
UDAF only requires a single
element be stored in memory for comparison. The collectset()
UDAF uses a set internally to
de-duplicate data in order to
limit memory usage. percentile_approx()
uses approximations to
achieve a near correct result while limiting memory usage. It is
important to keep memory usage in mind when writing a UDAF. You can
increase your available memory to some extent by adjusting mapred.child.java.opts
, but that solution does
not scale:
<property>
<name>
mapred.child.java.opts</name>
<value>
-Xmx200m</value>
</property>
MySQL has a useful function known as GROUP_CONCAT, which combines all the elements of a group into a single string using a user-specified delimiter. Below is an example MySQL query that shows how to use its version of this function:
mysql
>
CREATE
TABLE
people
(
name
STRING
,
friendname
STRING
);
mysql
>
SELECT
*
FROM
people
;
bob
sara
bob
john
bob
ted
john
sara
ted
bob
ted
sara
mysql
>
SELECT
name
,
GROUP_CONCAT
(
friendname
SEPARATOR
','
)
FROM
people
GROUP
BY
name
;
bob
sara
,
john
,
ted
john
sara
ted
bob
,
sara
We can do the same transformation in Hive without the need for
additional grammar in the language. First, we need an aggregate function
that builds a list of all input to the aggregate. Hive already has a
UDAF called collect_set
that adds all
input into a java.util.Set
collection. Sets automatically de-duplicate entries on insertion, which
is undesirable for GROUP CONCAT
. To build collect
, we will take the code in collect_set
and replace instances of Set
with instances of ArrayList
. This will stop the de-duplication.
The result of the aggregate will be a single array of all values.
It is important to remember that the computation of your aggregation must be arbitrarily divisible over the data. Think of it as writing a divide-and-conquer algorithm where the partitioning of the data is completely out of your control and handled by Hive. More formally, given any subset of the input rows, you should be able to compute a partial result, and also be able to merge any pair of partial results into another partial result.
The following code is available on Github. All
the input to the aggregation must be primitive types. Rather than
returning an ObjectInspector
, like
GenericUDFs
, aggregates return a
subclass of GenericUDAFEvaluator
:
@Description
(
name
=
"collect"
,
value
=
"_FUNC_(x) - Returns a list of objects. "
+
"CAUTION will easily OOM on large data sets"
)
public
class
GenericUDAFCollect
extends
AbstractGenericUDAFResolver
{
static
final
Log
LOG
=
LogFactory
.
getLog
(
GenericUDAFCollect
.
class
.
getName
());
public
GenericUDAFCollect
()
{
}
@Override
public
GenericUDAFEvaluator
getEvaluator
(
TypeInfo
[]
parameters
)
throws
SemanticException
{
if
(
parameters
.
length
!=
1
)
{
throw
new
UDFArgumentTypeException
(
parameters
.
length
-
1
,
"Exactly one argument is expected."
);
}
if
(
parameters
[
0
].
getCategory
()
!=
ObjectInspector
.
Category
.
PRIMITIVE
)
{
throw
new
UDFArgumentTypeException
(
0
,
"Only primitive type arguments are accepted but "
+
parameters
[
0
].
getTypeName
()
+
" was passed as parameter 1."
);
}
return
new
GenericUDAFMkListEvaluator
();
}
}
Table 13-1 describes the methods that are part of the base class.
Table 13-1. Methods in AbstractGenericUDAFResolver
Method | Description |
---|---|
| Called by Hive to initialize an instance of the UDAF evaluator class. |
| Return an object that will be used to store temporary aggregation results. |
| Process a new row of data into the aggregation buffer. |
| Return the contents of the current aggregation in a
persistable way. Here, persistable means the return value can
only be built up in terms of Java primitives, arrays, primitive
wrappers (e.g., |
| Merge a partial aggregation returned by |
| Return the final result of the aggregation to Hive. |
In the init
method, the object
inspectors for the result type are set, after determining what mode the
evaluator is in.
The iterate()
and terminatePartial()
methods are used on the map
side, while terminate()
and merge()
are used on the reduce side to produce
the final result. In all cases the merges are building larger
lists:
public
static
class
GenericUDAFMkListEvaluator
extends
GenericUDAFEvaluator
{
private
PrimitiveObjectInspector
inputOI
;
private
StandardListObjectInspector
loi
;
private
StandardListObjectInspector
internalMergeOI
;
@Override
public
ObjectInspector
init
(
Mode
m
,
ObjectInspector
[]
parameters
)
throws
HiveException
{
super
.
init
(
m
,
parameters
);
if
(
m
==
Mode
.
PARTIAL1
)
{
inputOI
=
(
PrimitiveObjectInspector
)
parameters
[
0
];
return
ObjectInspectorFactory
.
getStandardListObjectInspector
(
(
PrimitiveObjectInspector
)
ObjectInspectorUtils
.
getStandardObjectInspector
(
inputOI
));
}
else
{
if
(!(
parameters
[
0
]
instanceof
StandardListObjectInspector
))
{
inputOI
=
(
PrimitiveObjectInspector
)
ObjectInspectorUtils
.
getStandardObjectInspector
(
parameters
[
0
]);
return
(
StandardListObjectInspector
)
ObjectInspectorFactory
.
getStandardListObjectInspector
(
inputOI
);
}
else
{
internalMergeOI
=
(
StandardListObjectInspector
)
parameters
[
0
];
inputOI
=
(
PrimitiveObjectInspector
)
internalMergeOI
.
getListElementObjectInspector
();
loi
=
(
StandardListObjectInspector
)
ObjectInspectorUtils
.
getStandardObjectInspector
(
internalMergeOI
);
return
loi
;
}
}
}
...
The remaining methods and class definition define MkArrayAggregationBuffer
as well as top-level
methods that modify the contents of the buffer:
You may have noticed that Hive tends to avoid allocating objects
with new
whenever possible. Hadoop
and Hive use this pattern to create fewer temporary objects and thus
less work for the JVM’s Garbage
Collection
algorithms. Keep this in mind when writing UDFs,
because references are typically reused. Assuming immutable objects
will lead to bugs!
...
static
class
MkArrayAggregationBuffer
implements
AggregationBuffer
{
List
<
Object
>
container
;
}
@Override
public
void
reset
(
AggregationBuffer
agg
)
throws
HiveException
{
((
MkArrayAggregationBuffer
)
agg
).
container
=
new
ArrayList
<
Object
>();
}
@Override
public
AggregationBuffer
getNewAggregationBuffer
()
throws
HiveException
{
MkArrayAggregationBuffer
ret
=
new
MkArrayAggregationBuffer
();
reset
(
ret
);
return
ret
;
}
// Mapside
@Override
public
void
iterate
(
AggregationBuffer
agg
,
Object
[]
parameters
)
throws
HiveException
{
assert
(
parameters
.
length
==
1
);
Object
p
=
parameters
[
0
];
if
(
p
!=
null
)
{
MkArrayAggregationBuffer
myagg
=
(
MkArrayAggregationBuffer
)
agg
;
putIntoList
(
p
,
myagg
);
}
}
// Mapside
@Override
public
Object
terminatePartial
(
AggregationBuffer
agg
)
throws
HiveException
{
MkArrayAggregationBuffer
myagg
=
(
MkArrayAggregationBuffer
)
agg
;
ArrayList
<
Object
>
ret
=
new
ArrayList
<
Object
>(
myagg
.
container
.
size
());
ret
.
addAll
(
myagg
.
container
);
return
ret
;
}
@Override
public
void
merge
(
AggregationBuffer
agg
,
Object
partial
)
throws
HiveException
{
MkArrayAggregationBuffer
myagg
=
(
MkArrayAggregationBuffer
)
agg
;
ArrayList
<
Object
>
partialResult
=
(
ArrayList
<
Object
>)
internalMergeOI
.
getList
(
partial
);
for
(
Object
i
:
partialResult
)
{
putIntoList
(
i
,
myagg
);
}
}
@Override
public
Object
terminate
(
AggregationBuffer
agg
)
throws
HiveException
{
MkArrayAggregationBuffer
myagg
=
(
MkArrayAggregationBuffer
)
agg
;
ArrayList
<
Object
>
ret
=
new
ArrayList
<
Object
>(
myagg
.
container
.
size
());
ret
.
addAll
(
myagg
.
container
);
return
ret
;
}
private
void
putIntoList
(
Object
p
,
MkArrayAggregationBuffer
myagg
)
{
Object
pCopy
=
ObjectInspectorUtils
.
copyToStandardObject
(
p
,
this
.
inputOI
);
myagg
.
container
.
add
(
pCopy
);
}
}
Using collect
will return a
single row with a single array of all of the aggregated values:
hive
>
dfs
-
cat
$
HOME
/
afile
.
txt
;
twelve
12
twelve
1
eleven
11
eleven
10
hive
>
CREATE
TABLE
collecttest
(
str
STRING
,
countVal
INT
)
>
ROW
FORMAT
DELIMITED
FIELDS
TERMINATED
BY
'09'
LINES
TERMINATED
BY
'10'
;
hive
>
LOAD
DATA
LOCAL
INPATH
'${env:HOME}/afile.txt'
INTO
TABLE
collecttest
;
hive
>
SELECT
collect
(
str
)
FROM
collecttest
;
[
twelve
,
twelve
,
eleven
,
eleven
]
The concat_ws()
takes a
delimiter as its first argument. The remaining arguments can be string
types or arrays of strings. The returned result contains the argument
joined together by the delimiter. Hence, we have converted the array
into a single comma-separated string:
hive
>
SELECT
concat_ws
(
','
,
collect
(
str
))
FROM
collecttest
;
twelve
,
twleve
,
eleven
,
eleven
GROUP_CONCAT
can be done by
combining GROUP BY
, COLLECT
and concat_ws()
as shown here:
hive
>
SELECT
str
,
concat_ws
(
','
,
collect
(
cast
(
countVal
AS
STRING
)))
>
FROM
collecttest
GROUP
BY
str
;
eleven
11
,
10
twelve
12
,
1
While UDFs can be used be return arrays or structures, they cannot return multiple columns or multiple rows. User-Defined Table Generating Functions, or UDTFs, address this need by providing a programmatic interface to return multiple columns and even multiple rows.
We have already used the explode
method in
several examples. Explode
takes an array as input and
outputs one row for each element in the array. An alternative way to do
this would have the UDTF generate the rows based on some input. We will
demonstrate this with a UDTF that works like a for
loop. The function receives user inputs of
the start and stop values and then outputs N rows:
hive
>
SELECT
forx
(
1
,
5
)
AS
i
FROM
collecttest
;
1
2
3
4
5
Our class extends the GenericUDTF
interface. We declare three
integer variables for the start, end, and increment. The forwardObj
array will be used to return result
rows:
package
com
.
jointhegrid
.
udf
.
collect
;
import
java.util.ArrayList
;
import
org.apache.hadoop.hive.ql.exec.UDFArgumentException
;
import
org.apache.hadoop.hive.ql.metadata.HiveException
;
import
org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.*
;
import
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
;
import
org.apache.hadoop.hive.serde2.objectinspector.*
;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.*
;
import
org.apache.hadoop.io.IntWritable
;
public
class
GenericUDTFFor
extends
GenericUDTF
{
IntWritable
start
;
IntWritable
end
;
IntWritable
inc
;
Object
[]
forwardObj
=
null
;
...
Because the arguments to this function are constant, the value can be determined in the initialize method. Nonconstant values are typically not available until the evaluate method. The third argument for increment is optional, as it defaults to 1:
...
@Override
public
StructObjectInspector
initialize
(
ObjectInspector
[]
args
)
throws
UDFArgumentException
{
start
=((
WritableConstantIntObjectInspector
)
args
[
0
])
.
getWritableConstantValue
();
end
=((
WritableConstantIntObjectInspector
)
args
[
1
])
.
getWritableConstantValue
();
if
(
args
.
length
==
3
)
{
inc
=((
WritableConstantIntObjectInspector
)
args
[
2
])
.
getWritableConstantValue
();
}
else
{
inc
=
new
IntWritable
(
1
);
}
...
This function returns only a single column and its type is always an integer. We need to give it a name, but the user can always override this later:
...
this
.
forwardObj
=
new
Object
[
1
];
ArrayList
<
String
>
fieldNames
=
new
ArrayList
<
String
>
();
ArrayList
<
ObjectInspector
>
fieldOIs
=
new
ArrayList
<
ObjectInspector
>
();
fieldNames
.
add
(
"col0"
);
fieldOIs
.
add
(
PrimitiveObjectInspectorFactory
.
getPrimitiveJavaObjectInspector
(
PrimitiveCategory
.
INT
));
return
ObjectInspectorFactory
.
getStandardStructObjectInspector
(
fieldNames
,
fieldOIs
);
}
...
The process method is where the interesting work happens. Notice
that the return type is void. This is because UDTF can forward zero or
more rows, unlike a UDF, which has a single return. In this case the
call to the forward
method is nested inside a
for
loop, which causes it to forward a row for each
iteration:
...
@
Override
public
void
process
(
Object
[]
args
)
throws
HiveException
,
UDFArgumentException
{
for
(
int
i
=
start
.
get
();
i
<
end
.
get
();
i
=
i
+
inc
.
get
())
{
this
.
forwardObj
[
0
]
=
new
Integer
(
i
);
forward
(
forwardObj
);
}
}
@
Override
public
void
close
()
throws
HiveException
{
}
}
An example of a UDTF that returns multiple columns but only one
row is the parse_url_tuple
function,
which is a built-in Hive function. It takes as input a parameter that is
a URL and one or more constants that specify the parts of the URL the
user wants returned:
hive
>
SELECT
parse_url_tuple
(
weblogs
.
url
,
'HOST'
,
'PATH'
)
>
AS
(
host
,
path
)
FROM
weblogs
;
.
com
/
index
.
html
hotmail
.
com
/
a
/
links
.
html
The benefit of this type of UDFT is the URL only needs to be parsed once, then returns multiple columns—a clear performance win. The alternative, using UDFs, involves writing several UDFs to extract specific parts of the URL. Using UDFs requires writing more code as well as more processing time because the URL is parsed multiple times. For example, something like the following:
SELECT
PARSE_HOST
(
a
.
url
)
as
host
,
PARSE_PORT
(
url
)
FROM
weblogs
;
A UDTF can be used as a technique for adding more complex types to
Hive. For example, a complex type can be serialized as an encoded string
and a UDTF will deserialize the complex type when needed. Suppose we
have a Java class named Book
. Hive
cannot work with this datatype directly, however a Book
could be encoded to and decoded from a string format:
public
class
Book
{
public
Book
()
{
}
public
String
isbn
;
public
String
title
;
public
String
[]
authors
;
/* note: this system will not work if your table is
using '|' or ',' as the field delimiter! */
public
void
fromString
(
String
parts
){
String
[]
part
=
part
.
split
(
"|"
);
isbn
=
Integer
.
parseInt
(
part
[
0
]
);
title
=
part
[
1
]
;
authors
=
part
[
2
].
split
(
","
);
}
public
String
toString
(){
return
isbn
+
" "
+
title
+
" "
+
StringUtils
.
join
(
authors
,
","
);
}
}
Imagine we have a flat text file with books in this format. For now lets assume we could not use a delimited SerDe to split on | and ,:
hive
>
SELECT
*
FROM
books
;
5555555
|
Programming
Hive
|
Edward
,
Dean
,
Jason
In the pipe-delimited raw form it is possible to do some parsing of the data:
hive> SELECT cast(split(book_info,"|")[0] AS INTEGER) AS isbn FROM books > WHERE split(book_info,"|")[1] = "Programming Hive"; 5555555
This HiveQL works correctly, however it could be made easier for
the end user. For example, writing this type of query may require
consulting documentation regarding which fields and types are used,
remembering casting conversion rules, and so forth. By contrast, a UDTF
makes this HiveQL simpler and more readable. In the following example,
the parse_book()
UDTF is
introduced:
hive> FROM ( > parse_book(book_info) AS (isbn, title, authors) FROM Book ) a > SELECT a.isbn > WHERE a.title="Programming Hive" > AND array_contains (authors, 'Edward'), 5555555
The function parse_book()
allows Hive to return multiple columns of different types representing
the fields of a book:
package
com
.
jointhegrid
.
udf
.
collect
;
import
java.util.ArrayList
;
import
org.apache.hadoop.hive.ql.exec.UDFArgumentException
;
import
org.apache.hadoop.hive.ql.metadata.HiveException
;
import
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory
;
import
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector
.
PrimitiveCategory
;
import
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive
.
PrimitiveObjectInspectorFactory
;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive
.
WritableConstantStringObjectInspector
;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive
.
WritableStringObjectInspector
;
import
org.apache.hadoop.io.Text
;
public
class
UDTFBook
extends
GenericUDTF
{
private
Text
sent
;
Object
[]
forwardObj
=
null
;
...
The function will return three properties and ISBN as an integer, a title as a string, and authors as an array of strings. Notice that we can return nested types with all UDFs, for example we can return an array of array of strings:
...
@
Override
public
StructObjectInspector
initialize
(
ObjectInspector
[]
args
)
throws
UDFArgumentException
{
ArrayList
<
String
>
fieldNames
=
new
ArrayList
<
String
>
();
ArrayList
<
ObjectInspector
>
fieldOIs
=
new
ArrayList
<
ObjectInspector
>
();
fieldNames
.
add
(
"isbn"
);
fieldOIs
.
add
(
PrimitiveObjectInspectorFactory
.
getPrimitiveJavaObjectInspector
(
PrimitiveCategory
.
INT
));
fieldNames
.
add
(
"title"
);
fieldOIs
.
add
(
PrimitiveObjectInspectorFactory
.
getPrimitiveJavaObjectInspector
(
PrimitiveCategory
.
STRING
));
fieldNames
.
add
(
"authors"
);
fieldOIs
.
add
(
ObjectInspectorFactory
.
getStandardListObjectInspector
(
PrimitiveObjectInspectorFactory
.
getPrimitiveJavaObjectInspector
(
PrimitiveCategory
.
STRING
)
)
);
forwardObj
=
new
Object
[
3
];
return
ObjectInspectorFactory
.
getStandardStructObjectInspector
(
fieldNames
,
fieldOIs
);
}
...
The process method only returns a single row. However, each element in the object array will be bound to a specific variable:
...
@
Override
public
void
process
(
Object
[]
os
)
throws
HiveException
{
sent
=
new
Text
(((
StringObjectInspector
)
args
[
0
])
.
getPrimitiveJavaObject
(
os
[
0
]));
String
parts
=
new
String
(
this
.
sent
.
getBytes
());
String
[]
part
=
parts
.
split
(
"\|"
);
forwardObj
[
0
]
=
Integer
.
parseInt
(
part
[
0
]
);
forwardObj
[
1
]
=
part
[
1
]
;
forwardObj
[
2
]
=
part
[
2
].
split
(
","
);
this
.
forward
(
forwardObj
);
}
@
Override
public
void
close
()
throws
HiveException
{
}
}
We have followed the call to the book UDTF with AS
, which allows the result columns to be
named by the user. They can then be used in other parts of the query
without having to parse information from the book again:
client
.
execute
(
"create temporary function book as 'com.jointhegrid.udf.collect.UDTFBook'"
);
client
.
execute
(
"create table booktest (str string) "
);
client
.
execute
(
"load data local inpath '"
+
p
.
toString
()
+
"' into table booktest"
);
client
.
execute
(
"select book(str) AS (book, title, authors) from booktest"
);
[
555
Programming
Hive
"Dean"
,
"Jason"
,
"Edward"
]
UDFs may access files inside the distributed cache, the local filesystem, or even the distributed filesystem. This access should be used cautiously as the overhead is significant.
A common usage of Hive is the analyzing of web logs. A popular operation is determining the geolocation of web traffic based on the IP address. Maxmind makes a GeoIP database available and a Java API to search this database. By wrapping a UDF around this API, location information may be looked up about an IP address from within a Hive query.
The GeoIP API uses a small data file. This is ideal for showing the functionality of accessing a distributed cache file from a UDF. The complete code for this example is found at https://github.com/edwardcapriolo/hive-geoip/.
ADD FILE
is used to cache the
necessary data files with Hive. ADD JAR
is used to add the required Java JAR files to the cache and the classpath.
Finally, the temporary function must be defined as the final step before
performing queries:
hive
>
ADD
FILE
GeoIP
.
dat
;
hive
>
ADD
JAR
geo
-
ip
-
java
.
jar
;
hive
>
ADD
JAR
hive
-
udf
-
geo
-
ip
-
jtg
.
jar
;
hive
>
CREATE
TEMPORARY
FUNCTION
geoip
>
AS
'com.jointhegrid.hive.udf.GenericUDFGeoIP'
;
hive
>
SELECT
ip
,
geoip
(
source_ip
,
'COUNTRY_NAME'
,
'./GeoIP.dat'
)
FROM
weblogs
;
209
.
191
.
139
.
200
United
States
10
.
10
.
0
.
1
Unknown
The two examples returned include an IP address in the United States and a private IP address that has no fixed address.
The geoip()
function takes three
arguments: the IP address in either string or long format, a string that
must match one of the constants COUNTRY_NAME
or DMA_CODE
, and a final argument that is the name
of the data file that has already been placed in the distributed
cache.
The first call to the UDF (which triggers the first call to the
evaluate
Java function in the
implementation) will instantiate a LookupService
object that uses the file located
in the distributed cache. The lookup service is saved in a reference so it
only needs to be initialized once in the lifetime of a map or reduce task
that initializes it. Note that the LookupService
has
its own internal caching, LookupService.GEOIP\_MEMORY_CACHE
, so that
optimization should avoid frequent disk access when looking up IPs.
Here is the source code for evaluate()
:
@Override
public
Object
evaluate
(
DeferredObject
[]
arguments
)
throws
HiveException
{
if
(
argumentOIs
[
0
]
instanceof
LongObjectInspector
)
{
this
.
ipLong
=
((
LongObjectInspector
)
argumentOIs
[
0
]).
get
(
arguments
[
0
].
get
());
}
else
{
this
.
ipString
=
((
StringObjectInspector
)
argumentOIs
[
0
])
.
getPrimitiveJavaObject
(
arguments
[
0
].
get
());
}
this
.
property
=
((
StringObjectInspector
)
argumentOIs
[
1
])
.
getPrimitiveJavaObject
(
arguments
[
1
].
get
());
if
(
this
.
property
!=
null
)
{
this
.
property
=
this
.
property
.
toUpperCase
();
}
if
(
ls
==
null
){
if
(
argumentOIs
.
length
==
3
){
this
.
database
=
((
StringObjectInspector
)
argumentOIs
[
1
])
.
getPrimitiveJavaObject
(
arguments
[
2
].
get
());
File
f
=
new
File
(
database
);
if
(!
f
.
exists
())
throw
new
HiveException
(
database
+
" does not exist"
);
try
{
ls
=
new
LookupService
(
f
,
LookupService
.
GEOIP_MEMORY_CACHE
);
}
catch
(
IOException
ex
){
throw
new
HiveException
(
ex
);
}
}
}
...
An if
statement in evaluate
determines which data the method should
return. In our example, the country name is requested:
...
if
(
COUNTRY_PROPERTIES
.
contains
(
this
.
property
))
{
Country
country
=
ipString
!=
null
?
ls
.
getCountry
(
ipString
)
:
ls
.
getCountry
(
ipLong
);
if
(
country
==
null
)
{
return
null
;
}
else
if
(
this
.
property
.
equals
(
COUNTRY_NAME
))
{
return
country
.
getName
();
}
else
if
(
this
.
property
.
equals
(
COUNTRY_CODE
))
{
return
country
.
getCode
();
}
assert
(
false
);
}
else
if
(
LOCATION_PROPERTIES
.
contains
(
this
.
property
))
{
...
}
}
In this chapter we mentioned the Description
annotation and how it is used to
provide documentation for Hive methods at runtime. Other annotations exist
for UDFs that can make functions easier to use and even increase the
performance of some Hive queries:
public
@interface
UDFType
{
boolean
deterministic
()
default
true
;
boolean
stateful
()
default
false
;
boolean
distinctLike
()
default
false
;
}
By default, deterministic is automatically turned on for
most queries because they are inherently deterministic by nature. An
exception is the function rand()
.
If a UDF is not deterministic, it is not included in the partition pruner.
An example of a nondeterministic query using rand()
is the following:
SELECT
*
FROM
t
WHERE
rand
()
<
0
.
01
;
If rand()
were deterministic,
the result would only be calculated a single time in the computation
state. Because a query with rand()
is
nondeterministic, the result of rand()
is recomputed for each row.
Almost all the UDFs are stateful by default; a UDF that is
not stateful is rand()
because it
returns a different value for each invocation. The Stateful
annotation may be used under the
following conditions:
A stateful UDF can only be used in the SELECT
list, not in other clauses such as
WHERE/ON/ORDER/GROUP
.
When a stateful UDF is present in a query, the implication is
the SELECT
will be treated
similarly to TRANSFORM
(i.e., a
DISTRIBUTE/CLUSTER/SORT
clause),
then run inside the corresponding reducer to ensure the results are
as expected.
If stateful is set to true, the UDF should also be treated as nondeterministic (even if the deterministic annotation explicitly returns true).
See https://issues.apache.org/jira/browse/HIVE-1994 for more details.
Macros provide the ability to define functions in HiveQL that call other functions and operators. When appropriate for the particular situation, macros are a convenient alternative to writing UDFs in Java or using Hive streaming, because they require no external code or scripts.
To define a macro, use the CREATE TEMPORARY
MACRO
syntax. Here is an example that creates a SIGMOID
function calculator:
hive
>
CREATE
TEMPORARY
MACRO
SIGMOID
(
x
DOUBLE
)
1
.
0
/
(
1
.
0
+
EXP
(
-
x
));
hive
>
SELECT
SIGMOID
(
2
)
FROM
src
LIMIT
1
;