Filter Pushdown

Introduction

本文档说明了我们计划如何在 Hive 的优化器中添加支持,以将过滤器推入物理访问方法。这是一个重要的优化措施,它可以最大程度地减少访问方法(例如,用于索引键查找)扫描和处理的数据量,并减少传递给 Hive 进行进一步查询评估的数据量。

Use Cases

以下是我们定位的主要用例。

Components Involved

总体工作有很多不同的部分。

  • 传播 Hive 现有谓词下推的结果。 Hive 的优化器已经完成了通过查询计划向下推谓词的艰苦工作(通过配置参数 hive.optimize.ppd = true/false 控制)。剩下的“最后一英里”是将表级过滤器发送到相应的 Importing 格式。

  • 选择要传递给 Importing 格式的主过滤器表示形式。此表示形式必须是中立的(独立于将使用它的访问计划),并与 Hive 松散耦合(以便存储处理程序可以选择最小化其对 Hive 内部的依赖性)。

  • 用于解释主要表示形式的 Helper 类。许多访问计划将需要以类似的方式分析过滤器,例如分解连接并检测支持的列比较模式。 Hive 应该为这种情况提供可共享的 Util,这样就不必在每种访问方法的代码中重复它们。

  • 将过滤器转换为特定于访问方法的形式。这部分取决于特定的访问方法。例如对于 HBase,它涉及将过滤条件转换为相应的调用以构建HBase scan对象。

主过滤器表示形式

为了实现尽可能宽松的耦合,我们将使用字符串作为过滤器的主要表示形式。特别是,字符串将采用 Hive 解析ExprNodeDesc时生成的形式,例如

((key >= 100) and (key < 200))

通常,这可能是有效的 SQL,尽管它不一定总是与原始 SQL 完全匹配,例如

cast(x as int)

becomes

UDFToInteger(x)

该字符串中的列名称是对筛选器所操作的表的列的不合格引用,如它们在 Hive 元存储中所知。这些列名称可能与基础存储已知的名称不同。例如,HBase 存储处理程序将 Hive 列名 Map 到 HBase 列名(由列族限定)。从 Hive 列名进行 Map 是解释过滤字符串的代码的责任。

其他过滤器表示

如上所述,我们希望避免解释过滤字符串的代码重复(例如解析)。首先,我们将通过以序列化形式将ExprNodeDesc树作为过滤字符串的可选伴侣传递来提供对ExprNodeDesc树的访问。在后续工作中,我们将为字符串形式提供解析 Util。

我们还将提供一个 IndexPredicateAnalyzer 类,该类能够检测简单的sargable
ExprNodeDesc树中的子表达式。在后续活动中,我们将为区分和组合更复杂的可索引子表达式提供支持。

public class IndexPredicateAnalyzer
{
  public IndexPredicateAnalyzer();

  /**
 * Registers a comparison operator as one which can be satisfied
 * by an index search.  Unless this is called, analyzePredicate
 * will never find any indexable conditions.
   *
 * @param udfName name of comparison operator as returned
 * by either {@link GenericUDFBridge#getUdfName} (for simple UDF's)
 * or udf.getClass().getName() (for generic UDF's).
   */
  public void addComparisonOp(String udfName);

  /**
 * Clears the set of column names allowed in comparisons.  (Initially, all
 * column names are allowed.)
   */
  public void clearAllowedColumnNames();

  /**
 * Adds a column name to the set of column names allowed.
   *
 * @param columnName name of column to be allowed
   */
  public void allowColumnName(String columnName);

  /**
 * Analyzes a predicate.
   *
 * @param predicate predicate to be analyzed
   *
 * @param searchConditions receives conditions produced by analysis
   *
 * @return residual predicate which could not be translated to
 * searchConditions
   */
  public ExprNodeDesc analyzePredicate(
    ExprNodeDesc predicate,
    final List<IndexSearchCondition> searchConditions);

  /**
 * Translates search conditions back to ExprNodeDesc form (as
 * a left-deep conjunction).
   *
 * @param searchConditions (typically produced by analyzePredicate)
   *
 * @return ExprNodeDesc form of search conditions
   */
  public ExprNodeDesc translateSearchConditions(
    List<IndexSearchCondition> searchConditions);
}

public class IndexSearchCondition
{
  /**
 * Constructs a search condition, which takes the form
 * <pre>column-ref comparison-op constant-value</pre>.
   *
 * @param columnDesc column being compared
   *
 * @param comparisonOp comparison operator, e.g. "="
 * (taken from GenericUDFBridge.getUdfName())
   *
 * @param constantDesc constant value to search for
   *
 * @Param comparisonExpr the original comparison expression
   */
  public IndexSearchCondition(
    ExprNodeColumnDesc columnDesc,
    String comparisonOp,
    ExprNodeConstantDesc constantDesc,
    ExprNodeDesc comparisonExpr);
}

Filter Passing

将滤镜向下传递到 Importing 格式的方法将遵循与向下推入列投影相似的模式。

  • org.apache.hadoop.hive.serde2.ColumnProjectionUtils封装下推式通讯

  • 诸如HiveInputFormat之类的类调用ColumnProjectionUtils来在实例化RecordReader之前在 jobConf 上设置投影下推属性(READ_COLUMN_IDS_CONF_STR)

  • RecordReader调用ColumnProjectionUtils以访问此属性的工厂方法

对于过滤器下推:

  • HiveInputFormat在调用 getSplits 以及实例化记录读取器之前,在作业 conf 中设置属性hive.io.filter.text(字符串形式)和hive.io.filter.expr.serialized(ExprNodeDesc 的序列化形式)

  • 存储处理程序的 Importing 格式读取这些属性并处理过滤器表达式

  • 有用于协商过滤器分解的单独的优化程序交互(在后面的部分中描述)

请注意,由于过滤器的选择性可能会修剪掉某些本来可以访问的拆分,因此需要涉及 getSplits。 (从理论上讲,列投影也可能会影响分裂边界,但我们将其留待后续讨论.)

Filter Collection

因此,HiveInputFormat将在何处获取要传递的过滤器表达式?同样,我们可以从列投影的模式开始:

  • 在优化期间,org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory's ColumnPrunerTableScanProc填充TableScanOperator中的下推信息

  • 以后HiveInputFormat.initColumnsNeededTableScanOperator检索此信息

对于过滤器下推,等效值是org.apache.hadoop.hive.ql.ppd.OpProcFactory中的TableScanPPD。当前,它调用createFilter,它将表达式折叠成一个名为 condn 的单个表达式,然后将其粘贴到新的FilterOperator上。我们可以调用 condn.getExprString()并将结果存储在TableScanOperator上。

配置单元配置参数hive.optimize.ppd.storage可用于启用或禁用将过滤器下推到存储处理程序。默认情况下将启用此功能。但是,如果禁用了hive.optimize.ppd,那么这也隐式地阻止了下推到存储处理程序。

我们仅从 nonlocal 表开始;我们将再次讨论该问题,以将过滤器向下推至索引和内置存储格式(如 RCFile)。

Filter Decomposition

考虑一个像

x > 3 AND upper(y) = 'XYZ'

假设存储处理程序能够实现x > 3的范围 scan,但不具有评估{{upper(y)=
'XYZ'}}。在这种情况下,最佳计划将涉及分解过滤器,仅将第一部分向下推入存储处理器,以及
只剩下其余部分供 Hive 通过自己的执行程序进行评估。

为了使之成为可能,存储处理程序需要能够与 Hive 协商分解。这意味着 Hive 给
存储处理程序将整个过滤器传递给存储处理程序,而存储处理程序将传递回“剩余”:Hive 需要评估的部分。残差为零表示存储处理程序能够自行处理整个过滤器(在这种情况下,无需FilterOperator)。

为了支持这种交互,我们将引入一个新的(可选)接口,该接口将由存储处理程序实现:

public interface HiveStoragePredicateHandler {
  public DecomposedPredicate decomposePredicate(
    JobConf jobConf,
    Deserializer deserializer,
    ExprNodeDesc predicate);

  public static class DecomposedPredicate {
    public ExprNodeDesc pushedPredicate;
    public ExprNodeDesc residualPredicate;
  }
}

Hive 的优化程序(在谓词下推期间)调用 decomposePredicate 方法,传入完整表达式并接收分解(或为 null 表示无法进行下推)。 pushedPredicate稍后传递回存储处理程序的 Importing 格式,而residualPredicate附加到FilterOperator

假设足够复杂的存储处理程序可以实现此接口,则该存储处理程序适合与ExprNodeDesc表示形式紧密耦合。

同样,此接口是可选的,即使没有它,下推仍然可能。如果存储处理程序未实现此接口,则 Hive 将始终在FilterOperator中实现整个表达式,但仍将提供该表达式为存储处理程序的 Importing 格式;存储处理程序可以随意实现所需的数量。