On this page
Filter Pushdown
Introduction
本文档说明了我们计划如何在 Hive 的优化器中添加支持,以将过滤器推入物理访问方法。这是一个重要的优化措施,它可以最大程度地减少访问方法(例如,用于索引键查找)扫描和处理的数据量,并减少传递给 Hive 进行进一步查询评估的数据量。
Use Cases
以下是我们定位的主要用例。
将过滤器推入 Hive 的内置存储格式,例如 RCFile
将过滤器下推到诸如HBase handler(http://issues.apache.org/jira/browse/HIVE-1226)之类的存储处理程序中
将索引框架添加到 Hive(http://issues.apache.org/jira/browse/HIVE-417)后,将过滤器推入索引访问计划
Components Involved
总体工作有很多不同的部分。
传播 Hive 现有谓词下推的结果。 Hive 的优化器已经完成了通过查询计划向下推谓词的艰苦工作(通过配置参数 hive.optimize.ppd = true/false 控制)。剩下的“最后一英里”是将表级过滤器发送到相应的 Importing 格式。
选择要传递给 Importing 格式的主过滤器表示形式。此表示形式必须是中立的(独立于将使用它的访问计划),并与 Hive 松散耦合(以便存储处理程序可以选择最小化其对 Hive 内部的依赖性)。
用于解释主要表示形式的 Helper 类。许多访问计划将需要以类似的方式分析过滤器,例如分解连接并检测支持的列比较模式。 Hive 应该为这种情况提供可共享的 Util,这样就不必在每种访问方法的代码中重复它们。
将过滤器转换为特定于访问方法的形式。这部分取决于特定的访问方法。例如对于 HBase,它涉及将过滤条件转换为相应的调用以构建HBase scan对象。
主过滤器表示形式
为了实现尽可能宽松的耦合,我们将使用字符串作为过滤器的主要表示形式。特别是,字符串将采用 Hive 解析ExprNodeDesc
时生成的形式,例如
((key >= 100) and (key < 200))
通常,这可能是有效的 SQL,尽管它不一定总是与原始 SQL 完全匹配,例如
cast(x as int)
becomes
UDFToInteger(x)
该字符串中的列名称是对筛选器所操作的表的列的不合格引用,如它们在 Hive 元存储中所知。这些列名称可能与基础存储已知的名称不同。例如,HBase 存储处理程序将 Hive 列名 Map 到 HBase 列名(由列族限定)。从 Hive 列名进行 Map 是解释过滤字符串的代码的责任。
其他过滤器表示
如上所述,我们希望避免解释过滤字符串的代码重复(例如解析)。首先,我们将通过以序列化形式将ExprNodeDesc
树作为过滤字符串的可选伴侣传递来提供对ExprNodeDesc
树的访问。在后续工作中,我们将为字符串形式提供解析 Util。
我们还将提供一个 IndexPredicateAnalyzer 类,该类能够检测简单的sargableExprNodeDesc
树中的子表达式。在后续活动中,我们将为区分和组合更复杂的可索引子表达式提供支持。
public class IndexPredicateAnalyzer
{
public IndexPredicateAnalyzer();
/**
* Registers a comparison operator as one which can be satisfied
* by an index search. Unless this is called, analyzePredicate
* will never find any indexable conditions.
*
* @param udfName name of comparison operator as returned
* by either {@link GenericUDFBridge#getUdfName} (for simple UDF's)
* or udf.getClass().getName() (for generic UDF's).
*/
public void addComparisonOp(String udfName);
/**
* Clears the set of column names allowed in comparisons. (Initially, all
* column names are allowed.)
*/
public void clearAllowedColumnNames();
/**
* Adds a column name to the set of column names allowed.
*
* @param columnName name of column to be allowed
*/
public void allowColumnName(String columnName);
/**
* Analyzes a predicate.
*
* @param predicate predicate to be analyzed
*
* @param searchConditions receives conditions produced by analysis
*
* @return residual predicate which could not be translated to
* searchConditions
*/
public ExprNodeDesc analyzePredicate(
ExprNodeDesc predicate,
final List<IndexSearchCondition> searchConditions);
/**
* Translates search conditions back to ExprNodeDesc form (as
* a left-deep conjunction).
*
* @param searchConditions (typically produced by analyzePredicate)
*
* @return ExprNodeDesc form of search conditions
*/
public ExprNodeDesc translateSearchConditions(
List<IndexSearchCondition> searchConditions);
}
public class IndexSearchCondition
{
/**
* Constructs a search condition, which takes the form
* <pre>column-ref comparison-op constant-value</pre>.
*
* @param columnDesc column being compared
*
* @param comparisonOp comparison operator, e.g. "="
* (taken from GenericUDFBridge.getUdfName())
*
* @param constantDesc constant value to search for
*
* @Param comparisonExpr the original comparison expression
*/
public IndexSearchCondition(
ExprNodeColumnDesc columnDesc,
String comparisonOp,
ExprNodeConstantDesc constantDesc,
ExprNodeDesc comparisonExpr);
}
Filter Passing
将滤镜向下传递到 Importing 格式的方法将遵循与向下推入列投影相似的模式。
org.apache.hadoop.hive.serde2.ColumnProjectionUtils
封装下推式通讯诸如
HiveInputFormat
之类的类调用ColumnProjectionUtils
来在实例化RecordReader
之前在 jobConf 上设置投影下推属性(READ_COLUMN_IDS_CONF_STR)RecordReader
调用ColumnProjectionUtils
以访问此属性的工厂方法
对于过滤器下推:
HiveInputFormat
在调用 getSplits 以及实例化记录读取器之前,在作业 conf 中设置属性hive.io.filter.text
(字符串形式)和hive.io.filter.expr.serialized
(ExprNodeDesc 的序列化形式)存储处理程序的 Importing 格式读取这些属性并处理过滤器表达式
有用于协商过滤器分解的单独的优化程序交互(在后面的部分中描述)
请注意,由于过滤器的选择性可能会修剪掉某些本来可以访问的拆分,因此需要涉及 getSplits。 (从理论上讲,列投影也可能会影响分裂边界,但我们将其留待后续讨论.)
Filter Collection
因此,HiveInputFormat
将在何处获取要传递的过滤器表达式?同样,我们可以从列投影的模式开始:
在优化期间,
org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory's
ColumnPrunerTableScanProc
填充TableScanOperator
中的下推信息以后
HiveInputFormat.initColumnsNeeded
从TableScanOperator
检索此信息
对于过滤器下推,等效值是org.apache.hadoop.hive.ql.ppd.OpProcFactory
中的TableScanPPD
。当前,它调用createFilter
,它将表达式折叠成一个名为 condn 的单个表达式,然后将其粘贴到新的FilterOperator
上。我们可以调用 condn.getExprString()并将结果存储在TableScanOperator
上。
配置单元配置参数hive.optimize.ppd.storage
可用于启用或禁用将过滤器下推到存储处理程序。默认情况下将启用此功能。但是,如果禁用了hive.optimize.ppd
,那么这也隐式地阻止了下推到存储处理程序。
我们仅从 nonlocal 表开始;我们将再次讨论该问题,以将过滤器向下推至索引和内置存储格式(如 RCFile)。
Filter Decomposition
考虑一个像
x > 3 AND upper(y) = 'XYZ'
假设存储处理程序能够实现x > 3
的范围 scan,但不具有评估{{upper(y)=
'XYZ'}}。在这种情况下,最佳计划将涉及分解过滤器,仅将第一部分向下推入存储处理器,以及
只剩下其余部分供 Hive 通过自己的执行程序进行评估。
为了使之成为可能,存储处理程序需要能够与 Hive 协商分解。这意味着 Hive 给
存储处理程序将整个过滤器传递给存储处理程序,而存储处理程序将传递回“剩余”:Hive 需要评估的部分。残差为零表示存储处理程序能够自行处理整个过滤器(在这种情况下,无需FilterOperator
)。
为了支持这种交互,我们将引入一个新的(可选)接口,该接口将由存储处理程序实现:
public interface HiveStoragePredicateHandler {
public DecomposedPredicate decomposePredicate(
JobConf jobConf,
Deserializer deserializer,
ExprNodeDesc predicate);
public static class DecomposedPredicate {
public ExprNodeDesc pushedPredicate;
public ExprNodeDesc residualPredicate;
}
}
Hive 的优化程序(在谓词下推期间)调用 decomposePredicate 方法,传入完整表达式并接收分解(或为 null 表示无法进行下推)。 pushedPredicate
稍后传递回存储处理程序的 Importing 格式,而residualPredicate
附加到FilterOperator
。
假设足够复杂的存储处理程序可以实现此接口,则该存储处理程序适合与ExprNodeDesc
表示形式紧密耦合。
同样,此接口是可选的,即使没有它,下推仍然可能。如果存储处理程序未实现此接口,则 Hive 将始终在FilterOperator
中实现整个表达式,但仍将提供该表达式为存储处理程序的 Importing 格式;存储处理程序可以随意实现所需的数量。