apache-hive / 3.1.1 / reference / MapJoin_and_Partition_Pruning.html

Overview

在 Hive 中,Map-Join 是一种技术,可为联接中涉及的所有表(最大的表除外)物化数据,然后将大表流化为来自小表的物化数据。 Map-Join 通常是星型模式联接的一种很好的联接方法,在该联接中,事实表将流化在物化维度表上。

Problem

Map-Join 假定大表(流表)中的联接列是分区列,而小表中的对应列未分区,联接不会修剪大表中不必要的分区。由于所有小表的数据都在大表流化之前实现,因此从理论上讲,可以从大表中删除不必要的分区。

已创建 HIVE-5119 以跟踪此功能的改进。

Proposed Solution

为大表中的每个联接列找出所有小表中的值集(即分区键)。使用这些值集可以从大表中找出应该使用元数据进行扫描的分区。在 Map-Join 开始流式传输大表之前,更改要扫描大表的分区。仅通过显式配置(该配置的名称为 TBD)才能打开此功能。

Possible Extensions

•如果对连接谓词的 LHS 和 RHS 进行了分区,则对于内部表,可以在编译时静态确定分区。
•即使未对“大表”列进行分区,也可以将由小表生成的值集作为大表上的谓词下推。可以处理谓词下推的存储处理程序(如 ORC)可以利用此优势。

Optimization Details

此优化具有编译时间和运行/执行时间。编译时优化将作为物理优化器的一部分进行,作为最后的优化之一(在推断存储桶排序之前)。运行/执行时间优化将作为 MRLocalTask 执行的一部分并在启动 MapRedTask for Map-Join 之前进行。

Compile Time

1.确定可以参与分区修剪的 Map Join 运算符。
2.对于任务中的每个 Map-Join 运算符,从大表中标识可以参与分区修剪的列。

从大表中识别出的列具有以下 Feature:
•它们是加入条件的一部分
•大表位于联接的内侧
•联接条件中的任何函数均不涉及列
•在值达到表扫描的联接条件之前,列值未更改(无功能)。
•列是分区列。

3.从小型表中识别可以参与分区修剪的小型表和列。
小表中标识的列具有以下 Feature:
•列是连接条件中谓词的另一侧,并且“大表”列被标识为分区修剪的目标。
•列不是连接谓词上任何函数的一部分。
•列是联接的一部分,其中大表位于外侧。

4.修改 MapRedLocalTask,以从参与分区修剪的小表中为每一列组装值集,并为大表生成 PartitionDesc。

NOTE:
•这需要在 MapRedLocalTask 中将终端运算符添加到运算符 DAG。
•请注意,新的终端运算符将从所有感兴趣的小表中获取 Tuples(就像 HashTableSink 运算符一样)。
•级联的 Map-Join 运算符(使用同一大表在同一任务中的不同键上联接)仍将在 MapRedLocalTask 中使用同一终端运算符。

Runtime

1.当 Tuples 流入 MapRedLocal 任务中的新终端运算符时,它将提取感兴趣的列,并将其添加到该列的一组值中。

2.当在新的终端操作员上调用 close 时,它将通过咨询 Meta Store(使用在#1 处生成的值)来生成大表的分区。
NOTE:
•Meta Store 需要使用 in 子句回答查询。例如:给我表 R 的所有分区,其中(1,2,3)中的 x 列和(5,6,7)中的 y 列。

•如果级联 MapJoinOperators,则将基于多个键修剪大表(因此需要生成集来处理它)。

3.使用#2 中的列表修改 MapRedTask 中 BigTable 的 PartitionDesc。
NOTE:
•应该通过查找相交将#2 的 PartitionDesc 与大表的现有 PartitionDesc 合并。
•分区 Descriptors 的这种修改被设计为每个任务的预启动活动。反过来,任务将要求对相关工作进行预启动。工作可能会保留需要调用预启动程序的操作员的有序列表。

Assumptions :
•在当前的 HIVE 中,连接谓词只能包含连接词。
•Hive 仅支持 Equijoin

Pseudo Code

1.遍历任务 DAG,查找 MapredTask。对每个这样的 MapRedTask 执行#2-#6.
2.如果任务包含备份加入计划(即不是 MAPJOIN_ONLY_NOBACKUP 或 backupTask 不为 null),则跳过任务。
NOTE:
这是侵略性的。在对 Hive 代码的有限接触中,似乎目前仅为联接设置了条件任务。

3.在任务中使用查找模式“ TS.* MAPJOIN”。对每个 MAPJOIN 运算符执行#4-#6.

4.将 Map-Join 运算符标记为分区修剪的候选对象

4 .1 收集可能参与大表修剪的小表
一个。遵守加入条件。如果联接类型为“外部”,则检查大表是否在外侧。如果是这样,则进行救助。
b。如果大桌子在内侧,则将小桌子的位置添加到集合中。

4 .2 如果#4.1 中的设置为空,则进行救助。否则,从没有包装在函数中的大表中收集联接键
a)从“ MapJoinDesc.getKeys()。get(MapJoinDesc .getPosBigTable)”获取联接密钥
b)浏览“ ExpressionNodeDesc”列表;如果“ ExprNodeDesc”的类型为“ ExprNodeGenericFuncDesc”,则检查其中是否包含任何分区修剪器候选键(“ ExprNodeDescUtils.containsPredicate”)。如果函数中包含任何候选键,则将其从 partition-pruner-bigtable-candidate 列表中删除。

c)在“ #b 中的列表中”和“ ExprNodecolumnDesc”中创建一对“ ExprNodeColumnDesc 位置整数”,并添加到 partition-pruner-bigtable-candidate 列表中。

4 .3 如果 partition-pruner-bigtable-candidate 列表为空,则进行纾困。否则,使用分区修剪器候选集从#4.1 中找到未包装在函数中的联接键。
a)从 4.1 开始
b)从 4.1 获取每个元素的连接键
c)从#b 遍历联接键列表,检查其中是否有一个函数
d)如果#c 中的任何元素是函数,则检查它是否包含 partition-pruner-bigtable-candidate 列表中的任何元素。如果是,则从 partition-pruner-bigtable-candidate 列表和 set-generation-key-map 中删除该元素。
e)创建一对表位置并从#d 连接键元素。
f)将元素添加到 set-generation-key-map 中,其中 key 是元素在 partition-pruner-bigtable-candidate 列表中的位置,值是#e 中的元素。

4 .4 如果 partition-pruner-bigtable-candidate 设置为空,则进行纾困。否则,从已分区的 partition-pruner-bigtable-candidate 集中找到 BigTable 列。
a)从#4.2 的集合构造“ ExprNodeDesc”的列表
b)找出#a 的根表列 Descriptors(“ ExprNodeDescUtils.backtrack”)
c)从 Hive 获取大表的表元数据
d)从#b 遍历列表,并检查 Table 元数据,以查看其中的任何列是否已分区(“ Table.isPartitionKey”)。如果 column 不是分区键,则将其从分区修剪器候选列表中删除。

4 .5 如果 partition-pruner-bigtable-candidate 设置为空,则进行纾困。否则,在达到连接条件之前,请检查是否有任何分区修剪器元素可能会改变该值。我们将不得不向“ ExprNodeDescUtil”引入一种类似于回溯的新方法,但是要检查值是否可以突变(ex 函数)。

4 .6 如果#4.5 中的 partition-pruner-bigtable-候选列表为空,则纾困。否则,将#4.5 的 partition-pruner-bigtable-candidate 列表和 set-generation-key-map 添加到 PhysicalCtx 中的现有值列表。

a)创建一对 partition-pruner-bigtable-候选列表和 set-generation-key-map。
b)将其添加到物理上下文中的现有列表中(这是为了处理同一 MapRedTask 中的级联 mapjoin 运算符)

5.如果 partition-pruner-bigtable-candidate set 和 set-generation-keys 不为空,则修改相应的 LocalMRTask 以引入新的 PartitionPrunerSink 运算符(如果尚未创建)。
a)将 MapJoinOperator – HashTableSink 运算符的 Map 添加到物理上下文。这需要在 HashTableSink 生成期间发生。
b)从物理上下文中获取与 MapJoinOperator 对应的 HashTableSinkOperator。
c)从 MapJoin Operator 的所有父级中,确定代表 set-generation-key-map 中的小表的那些。
d)创建一个新的 PartitionDescGenSinkOp(带有 set-generation-key-map)
e)将其添加为#c 中元素的子级。

Assumption :

两个不同的 MapRedTask(包含 MapJoin 运算符)将导致两个不同的 MapRedLocalTask,即使它们共享同一组小表。

PartitionDescGenSink 的实现
a)在 BigTable 列和 HashSet 之间维护一个 Map。
b)使用 set-generation-key 从每个 Tuples 中提取与每个列对应的值。
c)将它们添加到 HashSet
d)在 PartitionDescGenSink 关闭时,请查阅 Metadata 以获取对应的键列的分区。这需要对 Hive 元数据处理进行潜在的增强,以提供一个 api“获取 column1 的值为 set1 或 column2 的值为 set2 的所有分区。
e)将分区信息写入文件。文件名和位置需要最终确定。

6.在与 MapJoin 对应的 MapRedTask 中,将 bigTable 的 TableScan 添加到启动前的运算符列表中。将对应的 PartitionDescGenSink 输出的位置添加到 TS。

7.在执行时,对每个任务调用预启动。任务将要求启动工作。工作将按 Sequences 调用列表中的操作员。对于 TableScan,预启动将导致读取 PartitionDescriptor 信息,并将找到现有 PartitionDesc 和 PartitionDescGenSink 生成的新列表的交集。保存在 MapWork 中的分区状态信息将使用新的分区(“ MapWork.pathToAliases”,“ MapWork.aliasToPartnInfo”,“ MapWork.pathToPartitionInfo”)进行更新。然后将由“ ExecDriver.execute”将其拾取,以设置 InputFormat 的 Importing 路径。

NOTE:
•在 Mapwork 中,我们可能需要维护 Table 别名到 List 的 Map。一种选择是引入新的“ addPathToPartitionInfo”方法,并切换当前的调用者以使用新的便捷方法。然后,此方法可以维护表别名到 PartitionDesc 列表的 Map。
•当前设计假定 Local Task 生成的分区 Descriptors 信息将通过文件传达给 MapRed Task。这显然是次优的。作为增强,可以引入不同的机制来传递此信息。