自定义函数

为了满足客户个性化的需求,Hive被设计成一个很开放的系统,很多内容都支持用户定制,包括:

  • 文件格式:Text File,Sequence File
  • 内存中的数据格式: Java Integer/String, Hadoop IntWritable/Text
  • 用户提供的map/reduce脚本:不管什么语言,利用stdin/stdout传输数据
  • 用户自定义函数

User Defined Function

用户可以使用‘show functions’ 查看function list,可以使用’describe function function-name’查看函数说明。

hive> show functions;
OK
!
!=
......
Time taken: 0.275 seconds
hive> desc function substr;
OK
substr(str, pos[, len]) - returns the substring of str that starts at pos and is of length len orsubstr(bin, pos[, len]) - returns the slice of byte array that starts at pos and is of length len
Time taken: 0.095 seconds

hive提供的build-in函数包括以下几类:

  1. 关系操作符:包括=、<>、<=、>=
  2. 算数操作符:包括+、-、*、/等
  3. 逻辑操作符:包括AND、&&、OR、||
  4. 复杂类型构造函数:包括map、struct、create_union
  5. 复杂类型操作符:包括A[n]、Map[key]、S.x
  6. 数学操作符:包括ln(double a)sqrt(double a)
  7. 集合操作符:包括size(Array)sort_array(Array)
  8. 类型转换函数: binary(string|binary)、cast(expr as )
  9. 日期函数:包括from_unixtime(bigint unixtime[, string format])unix_timestamp()10.条件函数:包括if(boolean testCondition, T valueTrue, T valueFalseOrNull)
  10. 字符串函数:包括acat(string|binary A, string|binary B)
  11. 其他:xpath、get_json_objectscii(string str)、con

Definition

编写Hive UDF有两种方式:

  1. extends UDF,重写evaluate方法
  2. extends GenericUDF,重写initialize、getDisplayString、evaluate方法

Extends UDF

如下的用法是将大写转化为小写

    package test.udf;

    import org.apache.hadoop.hive.ql.exec.UDF;
    import org.apache.hadoop.io.Text;

    public class ToLowerCase extends UDF {
        public Text evaluate(final Text s) {
            if (s == null) { return null; }
            return new Text(s.toString().toLowerCase());
        }
    }

Extends GenericUDF

计算array中去重后元素个数

    package test.udf;
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
    import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.apache.hadoop.io.IntWritable;

    /**
     * UDF:
     * Get nubmer of objects with duplicate elements eliminated

     */
    @Description(name = "array_uniq_element_number", value = "_FUNC_(array) - Returns nubmer of objects with duplicate elements eliminated.", extended = "Example:\n"
                    + "  > SELECT _FUNC_(array(1, 2, 2, 3, 3)) FROM src LIMIT 1;\n" + "  3")
    public class UDFArrayUniqElementNumber extends GenericUDF {

            private static final int ARRAY_IDX = 0;
            private static final int ARG_COUNT = 1; // Number of arguments to this UDF
            private static final String FUNC_NAME = "ARRAY_UNIQ_ELEMENT_NUMBER"; // External Name

            private ListObjectInspector arrayOI;
            private ObjectInspector arrayElementOI;
            private final IntWritable result = new IntWritable(-1);

            public ObjectInspector initialize(ObjectInspector[] arguments)
                            throws UDFArgumentException {

                    // Check if two arguments were passed
                    if (arguments.length != ARG_COUNT) {
                            throw new UDFArgumentException("The function " + FUNC_NAME
                                            + " accepts " + ARG_COUNT + " arguments.");
                    }

                    // Check if ARRAY_IDX argument is of category LIST
                    if (!arguments[ARRAY_IDX].getCategory().equals(Category.LIST)) {
                            throw new UDFArgumentTypeException(ARRAY_IDX, "\""
                                            + org.apache.hadoop.hive.serde.Constants.LIST_TYPE_NAME
                                            + "\" " + "expected at function ARRAY_CONTAINS, but "
                                            + "\"" + arguments[ARRAY_IDX].getTypeName() + "\" "
                                            + "is found");
                    }

                    arrayOI = (ListObjectInspector) arguments[ARRAY_IDX];
                    arrayElementOI = arrayOI.getListElementObjectInspector();

                    return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
            }

            public IntWritable evaluate(DeferredObject[] arguments)
                            throws HiveException {

                    result.set(0);

                    Object array = arguments[ARRAY_IDX].get();
                    int arrayLength = arrayOI.getListLength(array);
                    if (arrayLength <= 1) {
                            result.set(arrayLength);
                            return result;
                    }

                    //element compare; Algorithm complexity: O(N^2)
                    int num = 1;
                    int i, j;
                    for(i = 1; i < arrayLength; i++)
                    {
                            Object listElement = arrayOI.getListElement(array, i);
                            for(j = i - 1; j >= 0; j--)
                            {
                                    if (listElement != null) {
                                            Object tmp = arrayOI.getListElement(array, j);
                                            if (ObjectInspectorUtils.compare(tmp, arrayElementOI, listElement,
                                                            arrayElementOI) == 0) {
                                                    break;
                                            }
                                    }
                            }
                            if(-1 == j)
                            {
                                    num++;
                            }
                    }

                    result.set(num);
                    return result;
            }

            public String getDisplayString(String[] children) {
                    assert (children.length == ARG_COUNT);
                    return "array_uniq_element_number(" + children[ARRAY_IDX]+ ")";
            }
    }

Usage

临时添加UDF

hive> select * from test;
OK
Hello
wORLD
ZXM
ljz
Time taken: 13.76 seconds
hive> add jar /home/work/udf.jar;
Added /home/work/udf.jar to class path
Added resource: /home/work/udf.jar
hive> create temporary function mytest as 'test.udf.ToLowerCase';
OK
Time taken: 0.103 seconds
hive> show functions;
......
mytest
......
hive> select mytest(test.name) from test;
......
OK
hello
world
zxm
ljz
Time taken: 38.218 seconds

这种方式在会话结束后,函数自动销毁,因此每次打开新的会话,都需要重新add jar并且create temporary function

进入会话前自动创建

使用hive -i参数在进入hive时自动初始化

    $ cat hive_init
    add jar /home/work/udf.jar;
    create temporary function mytest as 'test.udf.ToLowerCase';
    $ hive -i hive_init
    Logging initialized using configuration in file:/home/work/hive/hive-0.8.1/conf/hive-log4j.properties
    Hive history file=/tmp/work/hive_job_log_work_201209200147_1951517527.txt
    hive> show functions;
    ......
    mytest
    ......
    hive> select mytest(test.name) from test;
    ......
    OK
    hello
    world
    zxm
    ljz

方法2和方法1本质上是相同的,区别在于方法2在会话初始化时自动完成

注册为内部函数

和前两者相比,第三种方式直接将用户的自定义函数作为注册为内置函数,未来使用起来非常简单,但这种方式也非常危险,一旦出错,将是灾难性的,因此,建议如果不是特别通用,并且固化下来的函数,还是使用前两种方式比较靠谱。

UDAF

Hive查询数据时,有些聚类函数在HQL没有自带,需要用户自定义实现 •用户自定义聚合函数: Sum, Average…… n – 1 •UDAF(User- Defined Aggregation Funcation) 用法 •一下两个包是必须的import org.apache.hadoop.hive.ql.exec.UDAForg.apache.hadoop.hive.ql.exec.UDAFEvaluator 开发步骤 •函数类需要继承UDAF类,内部类EvaluatorUDAFEvaluator接口 •Evaluator需要实现init、iterate、terminatePartial、merge、terminate这几个函数 a)init函数实现接口UDAFEvaluatorinit函数。 b)iterate接收传入的参数,并进行内部的轮转。其返回类型为booleanc)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoopCombinerd)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为booleane)terminate返回最终的聚集函数结果。 执行步骤 •执行求平均数函数的步骤 a)java文件编译成Avg_test.jarb)进入hive客户端添加jar包: hive>add jar /run/jar/Avg_test.jar。 c)创建临时函数: hive>create temporary function avg_test ‘hive.udaf.Avg’; d)查询语句: hive>select avg_test(scores.math) from scores; e)销毁临时函数: hive>drop temporary function avg_test;

UDAF代码示例 public class MyAvg extends UDAF {

public static class AvgEvaluator implements UDAFEvaluator { } public void init() {} public boolean iterate(Double o) {} public AvgState terminatePartial() {} public boolean terminatePartial(Double o) { } public Double terminate() {}

}

UDTF

UDTF(User-Defined Table-Generating Functions)用来解决 输入一行输出多行(On-to-many maping)的需求。 开发步骤 •UDTF步骤: •必须继承org.apache.Hadoop.hive.ql.udf.generic.GenericUDTF •实现initialize, process, close三个方法 •UDTF首先会 •调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型) 初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果返回 •最后close()方法调用,对需要清理的方法进行清理 使用方法 •UDTF有两种使用方法,一种直接放到select后面,一种和lateral view一起使用 •直接select中使用:select explodemap(properties) as (col1,col2) from src; •不可以添加其他字段使用:select a, explode_map(properties) as (col1,col2) from src •不可以嵌套调用:select explode_map(explode_map(properties)) from src •不可以和group by/cluster by/distribute by/sort by一起使用:select explode_map(properties) as (col1,col2) from src group by col1, col2 •和lateral view一起使用:select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2; 此方法更为方便日常使用。执行过程相当于单独执行了两次抽取,然后union到一个表里。 lateral view • Lateral View语法 •lateralView: LATERAL VIEW udtf(expression) tableAlias AS columnAlias (’,’ columnAlias) fromClause: FROM baseTable (lateralView)_

Lateral View用于UDTF(user-defined table generating functions)中将行转成列,例如explode(). •目前Lateral View不支持有上而下的优化。如果使用Where子句,查询可能将不被编译。解决方法见: 此时,在查询之前执行set hive.optimize.ppd=false; • 例子 •pageAds。它有两个列 string pageid Array adid_list " front_page" [1, 2, 3] “contact_page " [ 3, 4, 5] •SELECT pageid, adid FROM pageAds LATERAL VIEW explode(adid_list) adTable AS adid; •将输出如下结果 string pageid int adid “front_page” 1 ……. “contact_page” 3

代码示例 public class MyUDTF extends GenericUDTF{ public StructObjectInspector initialize(ObjectInspector[] args) {} public void process(Object[] args) throws HiveException { } }

上一页