编写 GenericUDAF:教程

用户定义的聚合功能(UDAF)是将高级数据处理集成到 Hive 中的绝佳方法。 Hive 允许使用两种类型的 UDAF:简单的和通用的。顾名思义,简单的 UDAF 很容易编写,但是由于使用Java Reflection而导致性能下降,并且不允许使用可变长度参数列表之类的功能。通用 UDAF 具有所有这些功能,但编写起来可能不像简单 UDAF 那样直观。

本教程介绍了histogram() UDAF 的开发过程,该模型使用恒定的内存量和 Importing 大小的线性时间来计算具有固定的,用户指定数量的 bin 的直方图。它演示了通用 UDAF 的许多功能,例如复杂的返回类型(结构数组)以及对 Importing 的类型检查。假设 Reader 希望编写 UDAF 以便最终提交到 Hive 开源项目,因此还包括诸如在 Hive 中修改功能注册表和编写.q测试之类的步骤。如果您只想编写 UDAF,请在本地调试和部署,请参见this page

注意: 在本教程中,我们将逐步创建histogram()函数。从 Hive 的 0.6.0 版本开始,它显示为内置函数histogram_numeric()

Preliminaries

通过在 Hive 目录中运行svn up来确保拥有最新的 Hive 中继。有关下载和设置 Hive 的更多详细说明,请参见Getting Started。您的 Hive 本地副本应通过从 Hive 根目录运行build/dist/bin/hive来工作,并且应该将一些数据表加载到本地实例中以测试您考虑的 UDAF。对于此示例,假设存在一个名为normal的表,该表具有一个名为val的单个double列,其中包含从标准正态分布中提取的大量随机数。

相对于 Hive 根目录,我们将要编辑或创建的文件如下:

ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogram.java主源文件,由您创建。
ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java功能注册表源文件,您可以对其进行编辑,以将我们新的histogram() UDAF 注册到 Hive 的内置功能列表中。
ql/src/test/queries/clientpositive/udaf_histogram.q由您创建的 samples 查询文件,用于对 samples 数据进行histogram()测试。
ql/src/test/results/clientpositive/udaf_histogram.q.out您的示例查询的预期输出,将由ant在后续步骤中创建。
ql/src/test/results/clientpositive/show_functions.q.outSHOW FUNCTIONS Hive 查询的预期输出。由于我们要添加新的histogram()函数,因此此预期输出将更改以反映新函数。此文件将在以后的步骤中被ant修改。

编写源代码

本节概述了如何实现自己的通用 UDAF。作为一个具体示例,请查看ql/src/java/org/apache/hadoop/hive/ql/udf/generic/目录中存在的任何现有 UDAF 源。

Overview

从高层次上讲,实现通用 UDAF 有两个部分。第一个是编写一个* resolver 类,第二个是创建一个 evaluator *类。解析器处理类型检查和运算符重载(如果需要),并帮助 Hive 为给定的一组参数类型找到正确的评估器类。然后,评估程序类实际上实现了 UDAF 逻辑。通常,顶级 UDAF 类扩展了抽象 Base Classorg.apache.hadoop.hive.ql.udf.GenericUDAFResolver2 **,并且评估程序类被编写为静态内部类。

编写解析器

解析程序处理 UDAF 查询的类型检查和运算符重载。类型检查可确保用户不会传递例如期望使用 integerdouble 表达式,并且运算符重载使您可以对不同类型的参数使用不同的 UDAF 逻辑。

解析程序类必须扩展 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2 **(有关向后兼容性的信息,请参阅#Resolver 接口演变)。我们建议您扩展 AbstractGenericUDAFResolverBase Class,以使 UDAF 与将来 Hive 中的接口更改隔离。

查看现有的 UDAF 中所需的* import * s。

#!Java
public class GenericUDAFHistogramNumeric extends AbstractGenericUDAFResolver {
  static final Log LOG = LogFactory.getLog(GenericUDAFHistogramNumeric.class.getName());

  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
    // Type-checking goes here!

    return new GenericUDAFHistogramNumericEvaluator();
  }

  public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator {
    // UDAF logic goes here!
  }
}

上面的代码显示了 UDAF 的基本框架。第一行设置了一个 Log 对象,您可以使用该对象编写警告和错误,以将它们馈送到 Hive 日志中。 GenericUDAFResolver 类具有一个重写的方法: getEvaluator ,该方法接收有关如何调用 UDAF 的信息。最有趣的是 info.getParameters(),它提供了一个与调用参数的 SQL 类型相对应的类型信息对象数组。对于 UDAF 直方图,我们需要两个参数:要在其上计算直方图的数字列,以及所请求的直方图箱数。首先要做的是检查我们是否确实有两个参数(下面的 3-6 行)。然后,我们检查第一个参数是否具有原始类型,而不是例如数组或 Map(第 9-13 行)。但是,我们不仅希望它是原始类型的列,而且还希望它是数字的,这意味着如果给定了 STRING 类型,则需要抛出异常(第 14-28 行)。排除 BOOLEAN 是因为可以通过简单的 COUNT()查询来解决“直方图”估计问题。第 30-41 行说明了对 histogram()UDAF 的第二个参数(直方图块数)的类似类型检查。在这种情况下,我们坚持认为直方图箱数是一个整数。

#!Java
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
    TypeInfo [] parameters = info.getParameters();
    if (parameters.length != 2) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Please specify exactly two arguments.");
    }
    
    // validate the first parameter, which is the expression to compute over
    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }
    switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
    case BYTE:
    case SHORT:
    case INT:
    case LONG:
    case FLOAT:
    case DOUBLE:
      break;
    case STRING:
    case BOOLEAN:
    default:
      throw new UDFArgumentTypeException(0,
          "Only numeric type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }

    // validate the second parameter, which is the number of histogram bins
    if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(1,
          "Only primitive type arguments are accepted but "
          + parameters[1].getTypeName() + " was passed as parameter 2.");
    }
    if( ((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory()
        != PrimitiveObjectInspector.PrimitiveCategory.INT) {
      throw new UDFArgumentTypeException(1,
          "Only an integer argument is accepted as parameter 2, but "
          + parameters[1].getTypeName() + " was passed instead.");
    }
    return new GenericUDAFHistogramNumericEvaluator();
  }

可以在此处实现某种形式的操作员重载。假设您有两种完全不同的直方图构造算法-一种设计为仅处理整数,另一种设计为使用双精度数据类型。然后,您将创建两个单独的 Evaluator 内部类,并且根据 Importing 表达式的类型,将返回正确的一个作为 Resolver 类的返回值。

  • CAVEAT *:直方图功能应类似于以下形式使用:SELECT histogram_numeric(age, 30) FROM employees;,这意味着使用 30 个直方图箱估算员工年龄的分布。但是,在 resolver 类中,无法判断第二个参数是常量还是具有许多不同值的整数列。因此,假设年龄是整数列,病态扭曲的用户可能会写类似SELECT histogram_numeric(age, age) FROM employees;的内容。当然,这毫无意义,但是可以在上面的解析器类型检查代码中正确验证。

我们将在评估器中处理此问题。

撰写评估者

所有评估程序都必须从抽象 Base Classorg.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator 扩展。此类提供了一些必须由扩展类实现的抽象方法。这些方法构建了 UDAF 遵循的处理语义。以下是 Evaluator 类的框架。

#!Java
  public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator {

    // For PARTIAL1 and COMPLETE: ObjectInspectors for original data
    private PrimitiveObjectInspector inputOI;
    private PrimitiveObjectInspector nbinsOI;

    // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles)
    private StandardListObjectInspector loi;

    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
      super.init(m, parameters);
      // return type goes here
    }

    @Override
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
      // return value goes here
    }

    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
      // final return value goes here
    }

    @Override
    public void merge(AggregationBuffer agg, Object partial) throws HiveException {
    }

    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
    }

    // Aggregation buffer definition and manipulation methods 
    static class StdAgg implements AggregationBuffer {
    };

    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
    }

    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
    }    
  }

所有这些功能做什么?以下是每个函数的简要摘要,按(按时间 Sequences)被调用。记住非常重要的一点是,聚合的计算必须可以在数据上任意分割。可以将其想象为编写分治算法,其中数据分区完全不受您的控制,由 Hive 处理。更正式地说,给定 Importing 行的任何子集,您应该能够计算部分结果,并且还能够将任何一对部分结果合并到另一个部分结果中。这自然使移植许多现有算法变得困难,但是应该保证研究人员工作相当长的时间。

FunctionPurpose
init由 Hive 调用以初始化 UDAF 评估程序类的实例。
getNewAggregationBuffer返回一个将用于存储临时聚合结果的对象。
iterate将新数据行处理到聚合缓冲区中
terminatePartial以可持久的方式返回当前聚合的内容。这里的持久性意味着返回值只能根据 JavaPrimitives,数组,Primitives 包装器(例如 Double),Hadoop Writables,List 和 Maps 构建。不要使用您自己的类(即使它们实现 java.io.Serializable),否则您可能会遇到奇怪的错误或(可能更糟)错误的结果。
mergeterminatePartial 返回的部分聚合合并到当前聚合中
terminate将聚合的最终结果返回给 Hive

在编写histogram()函数时,采用了以下策略。

getNewAggregationBuffer

直方图的聚合缓冲区是(x,y)对的列表,它们代表直方图的 bin 中心和高度。此外,聚合缓冲区还存储两个整数,这些整数具有最大 bin 数(用户指定的参数)和当前使用的 bin 数。聚合缓冲区被初始化为“未就绪”状态,其中 bin 的数量设置为 0.这是因为 Hive 在提供给 UDAF 的常量参数与表中的列之间没有区别。因此,在第一次调用iterate()之前,我们无法知道用户想要在直方图中有多少个 bin。

iterate

我们在iterate()中要做的第一件事是检查聚合缓冲区中的直方图对象是否已初始化。如果不是,我们将第二个参数解析为iterate(),这是用户请求的直方图箱数。我们只执行一次并初始化直方图对象。请注意,此处执行错误检查-如果用户为直方图箱数提供了负数或零,则此时将抛出HiveException并终止计算。

接下来,我们解析出实际的 Importing 数据项(一个数字),并将其添加到聚合缓冲区中的直方图估计中。有关用于构造直方图的启发式方法的详细信息,请参见GenericUDAFHistogramNumeric.java文件。

terminatePartial

当前的直方图近似值被序列化为DoubleWritable个对象的列表。列表中的前两个双精度表示用户指定的直方图箱的最大数量和当前使用的箱数量。其余条目是当前直方图近似值中的(x,y)对。

merge

在这一点上,我们有一个(可能是未初始化的)直方图估计值,并且已被要求将其与对行的单独子集执行的另一个估计值合并。如果 N 是用户指定的直方图 bin 的数量,则当前启发式方法会首先使用两个估计的所有 2N 个 bin 来构建直方图,然后迭代合并最接近的 bin 对,直到仅 N 垃圾桶仍然存在。

terminate

histogram()函数的最终返回类型是(x,y)对的数组,它们代表直方图 bin 中心和高度。可以将它们\ {{explode()}}放入单独的表中,或者使用脚本进行解析,然后传递给 Gnuplot(例如)以直观显示直方图。

修改功能注册表

一旦编写了 UDAF 的代码并将源文件放置在ql/src/java/org/apache/hadoop/hive/ql/udf/generic中,就可以修改功能注册表并将新功能合并到 Hive 的功能列表中。这仅涉及编辑ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java以导入您的 UDAF 类并注册它的名称。

请注意,您将必须运行以下命令来更新show functions Hive 调用的输出:

ant test -Dtestcase=TestCliDriver -Dqfile=show_functions.q -Doverwrite=true

编译并运行

ant package
build/dist/bin/hive

创建测试

系统级测试包括编写一些对 samples 数据进行操作的 samples 查询,从查询生成预期的输出,并确保将来的预期输出不会中断。请注意,预期的输出将通过diff与 Hive 的实际输出一起传递,因此非确定性算法将必须计算某种统计量,然后仅保留最高有效数字(例如)。

这些是为新的 UDAF/UDF 创建测试用例所需的简单步骤:

  • ql/src/test/queries/clientpositive/udaf_XXXXX.q中创建一个文件,其中XXXXX是您的 UDAF 的名称。
    2.在.q文件中添加一些查询-希望足以涵盖所有功能和特殊情况。
    3.对于示例数据,请将您自己的文件放在hive/data/files并使用LOAD DATA LOCAL INPATH...进行加载,或重用已有的文件之一(查询目录中的 LOAD 的 grep,以查看表名)。
    1. touch ql/src/test/results/clientpositive/udaf_XXXX.q.out
      5.运行以下命令以将输出生成到.q.out结果文件中。
ant test -Dtestcase=TestCliDriver -Dqfile=udaf_XXXXX.q -Doverwrite=true

6.运行以下命令以确保测试运行正常。

ant test -Dtestcase=TestCliDriver -Dqfile=udaf_XXXXX.q

开源提交清单

  • 在 Hive JIRA(https:-issues.apache.org-jira-browse-HIVE)上创建一个帐户,在Query Processor组件下为新补丁程序创建一个问题。征求讨论,纳入反馈。

  • 创建您的 UDAF,将其集成到本地 Hive 副本中。

  • 从 Hive 根目录运行ant package以编译 Hive 和新的 UDAF。

  • 创建.q个测试及其相应的.q.out输出。

  • 如果添加新功能,请修改功能注册表。

  • 运行ant checkstyle并检查build/checkstyle/checkstyle-errors.html,确保源文件符合 Sun Java 编码约定(100 个字符的行长 exception)。

  • 运行ant test,确保测试通过。

  • 运行svn up,确保与主存储库没有冲突。

  • 对您创建的任何新文件运行svn add

  • 确保您已添加.q.q.out测试。

  • 确保已针对所有新功能运行.q测试。

  • 如果添加新的 UDAF,请确保show_functions.q.out已更新。运行ant test -Dtestcase=TestCliDriver -Dqfile=show_functions.q -Doverwrite=true以执行此操作。

  • 从 Hive 根目录运行svn diff > HIVE-NNNN.1.patch,其中 NNNN 是 JIRA 分配给您的发行号。

  • 将文件附加到 JIRA 问题,在 Comment 部分描述您的补丁。

  • 在 Comment 中要求进行代码审查。

  • 完成上述步骤后,点击“提交补丁” **。

  • 还建议您“观察”问题以监视新 Comment。

技巧,窍门和最佳做法

  • Hive 有时会出现意外行为。最好先运行ant clean,如果您看到奇怪的内容,从无法解释的异常到错误地用双引号引起来的字符串。

  • terminatePartial()调用中序列化聚合缓冲区时,如果您的 UDAF 仅使用几个变量表示缓冲区(例如平均值),请考虑将其序列化为双精度列表,例如,而不是复杂的命名结构。

  • 尽可能强地转换泛型。

  • 从多个 UDAF 到其自己的类的抽象核心功能。示例是histogram_numeric()percentile_approx(),它们都使用相同的核心直方图估计功能。

  • 如果您一直在寻找一种适合 terminatePartial/merge 范例的算法,那么分治法和并行算法无疑是上手的好地方。

  • 请记住,测试对预期和实际输出执行diff,如果完全不同,则失败。可能会严重失败的一个示例是ngrams()之类的 UDAF,其中输出是已排序(单词,计数)对的列表。在某些情况下,不同的排序实现可能会将具有相同计数的单词放在输出中的不同位置。即使输出正确,测试也将失败。在这些情况下,最好只输出(例如)计数,或输出一些适当的统计数据,例如总和。

解析器界面的演变

从 0.6.0 版本开始,旧接口 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver 被弃用。 GenericUDAFResolver 和 GenericUDAFResolver2 接口之间的主要区别在于,后者允许评估程序实现访问有关函数调用的额外信息,例如 DISTINCT 限定符的存在或使用通配符语法(例如 FUNCTION())的调用。实现了不赞成使用的 GenericUDAFResolver 接口的 UDAF 将无法区分调用之间的区别,例如 FUNCTION()或 FUNCTION(),因为有关通配符规范的信息不可用。同样,这些实现也将无法分辨 FUNCTION(EXPR)与 FUNCTION(DISTINCT EXPR)之间的区别,因为有关 DISTINCT 限定符存在的信息也不可用。

请注意,尽管为实现 GenericUDAFResolver2 接口的解析器提供了有关使用通配符语法调用 DISTINCT 限定符的额外信息,但如果对它们没有意义,他们可以选择完全忽略它。计算 DISTINCT 值的基础数据过滤实际上是由 Hive 的核心查询处理器完成的,而不是由评估程序或解析程序完成的;信息仅提供给解析器用于验证目的。 AbstractGenericUDAFResolverBase Class 提供了一种简便的方法,可以将以前编写的 UDAF 实现过渡到新的 resolver 接口,而不必重新编写实现,因为从实现 GenericUDAFResolver 接口到扩展 AbstractGenericUDAFResolver 类的更改非常小。 (实现可能会成为继承层次结构的一部分,因为更改 Base Class 可能并不容易.)