apache-hive / 3.1.1 / reference / Hive_on_Spark__Join_Design_Master.html

Hive on Spark:加入设计大师

目的和先决条件

本文档的目的是总结对不同联接的所有研究的发现,并描述一种统一的设计以解决 Spark 中的问题。它将确定优化处理器将涉及及其职责。

并不是为了深入设计 Spark 中的各种联接实现而设计的目的,例如公共联接(HIVE-7384)或优化的联接变量,例如 mapjoin(HIVE-7613),skew-join(HIVE-8406)或 SMB mapjoin(HIVE-8202)。在阅读本文档之前,请参考 JIRA 上随附的设计文档以获取这些详细信息,因为它们还将包含在 MapReduce 和比较中如何实现它们的一些背景知识。最后,在阅读本文档之前,请阅读整个Hive on Spark设计文档也将有所帮助。

MapReduce Summary

本节总结了 Mapiveuce 上 Hive 的不同联接的计划生成,它将作为 Spark 的模型。我们旨在支持大多数此类连接优化。优先级将用于自动优化的联接,其次是需要用户 Importing 的联接,例如提示和元数据。

多年来,除了通过处理器(操作员树或工作树的部分转换)的普通联接之外,Hive 还引入了许多联接优化。下图(图 1)显示了不同处理器的关系,每个处理器在将运算符树从 common-join 转换为优化的 join 工作树之一(mapjoin,bucket mapjoin,SMB mapjoin 或 skewjoin)。

下图中的框表示处理器:将处理器分为三种类型:

  • 逻辑优化处理器(绿色):处理纯运算符树(无物理工作)。

  • 工作生成处理器(蓝色):处理操作员树并参与形成物理工作树。

  • 物理优化处理器(红色):处理格式完整的物理工作树。

每个处理器框均显示触发条件,Hive 配置属性或树中是否存在某个运算符。因此,您可以通过遵循特定的联接通过处理器的路径并确保触发所有配置并且给定的运算符已由以前的处理器创建来查看如何触发特定联接。还有其他条件可以进行顶部列出的转换(即表的大小等),这些条件在本文档中没有解释,可以从各个联接的文档中引用。

图 1.在 MapReduce 上加入 Hive 的处理器

任何处理器之前顶部的 Importing 箭头始终是 common-join 的操作员计划,如下所示。换句话说,如果没有激活任何优化程序处理器,这就是原始的加入计划。

图 1 中所示的联接路径的“退出”箭头是各种优化的联接变体。当然,还有其他 Export 路径,在这些 Export 路径中,一旦退出任何处理器的先决条件检查,该计划便会作为公共联接保持不变,但是未显示它们以简化该图。

一些重要处理器的说明如下。同样,没有足够的空间来描述它们,因此很少选择关键的。

  • SortedBucketMapJoinProc:此逻辑优化处理器处理将公共联接运算符树自动转换为 SMB 联接运算符树,以进行进一步处理。

to

  • MapJoinProcessor:当用户给出了标识小表的提示时,此逻辑优化处理器处理从公共联接运算符树到 mapjoin 运算符树的初始转换,以进行进一步处理。

to

  • MapJoinFactory:这是 MapJoinProcessor 之后的工作生成阶段的一部分。它处理从中创建的工作树,并添加本地工作以标识小型表。在 SMB MapJoin 路径中的 SortedBucketMapJoinProc 之后,它也以类似的方式跟随其工作生成阶段的一部分,还添加了本地工作以标识 SMB MapJoin 的小表。

MapJoinFactory MapJoin 处理:

to

MapJoinFactory SMB MapJoin 处理:

to

  • CommonJoinResolver:与基于提示的 mapjoin 转换相比,此方法可处理联接到 mapjoins 的自动转换,并具有单独的路径。这将使用已经从 common-join 运算符树生成的工作 common-join 工作树,并创建备用工作树。新的工作树由植根于大桌子的 Map 组成。指向小表的指针通过 LocalWork 数据结构保留在新工作中。

to:

  • MapJoinResolver:在此,两个 mapjoin 路径(提示和非提示 mapjoins)再次合并。这两个结果的最后一步是使其准备好在 MR 群集上进行物理执行,这将在下面的“ Spark MapJoin”部分中详细介绍。 MapJoinResolver 将单个作品分为两个作品。首先是处理小表的本地 MapRedWork,最后是 HashTableSink 编写哈希表文件。然后是处理大表的 MapRedWork,并通过 HashTableDummyOp 从小表哈希表文件加载。

to

图 1 所示为可能的联接计划的所有处理器路径的简要摘要:

  • Skewjoin (compile-time)

  • SkewJoinOptimizer:从公共联接运算符树中,创建由联合运算符连接的两个联接运算符树。这些将表示带有斜键的联接,以及不带斜键的联接。

    • 减少端 Connecting 的一个或两个都可以通过 CommonJoinResolver 转换为 mapjoin,有关更多详细信息,请参见 auto-mapjoin。
  • Skewjoin (runtime)

  • SkewJoinResolver:在原始的普通联接工作之后创建条件工作,该工作是 mapjoin 工作的列表。这些将处理倾斜键。

    • MapJoinResolver:mapjoin 作品的最终准备工作如所述。
  • Auto-mapjoin

  • CommonJoinResolver:将公共联接运算符树转换为 mapjoin 运算符树,并在 Mapjoin 运算符上标识大/小表,如所述。

    • MapJoinResolver:mapjoin 作品的最终准备工作如所述。
  • 带有提示的 Map 联接查询

  • MapJoinProcessor:将公共联接运算符树转换为 mapjoin 运算符树,并在 Mapjoin 运算符上标识大/小表。

    • MapJoinFactory:如上所述,添加指向 mapjoin 工作中的小表的 localWork。

    • MapJoinResolver:mapjoin 作品的最终准备工作如所述。

  • 带有提示的桶图联接查询。

  • MapJoinProcessor:将公共联接运算符树转换为 mapjoin 运算符树,并在 Mapjoin 运算符上标识大/小表。

    • BucketMapJoinProcessor:将存储桶信息添加到 MapJoin op。

    • MapJoinFactory:如上所述,添加指向 mapjoin 工作中的小表的 localWork。

    • MapJoinResolver:mapjoin 作品的最终准备工作如所述。

  • 带有提示的 SMB 加入查询

  • MapJoinProcessor:将公共联接运算符树转换为 mapjoin 运算符树,并在 Mapjoin 运算符上标识大/小表。

    • SortedBucketMapJoinProc:将 mapjoin 运算符树转换为 SMBMapJoin 运算符树。将 DummyOp 添加到小型表中。

    • MapJoinFactory:如上所述,在 SMBMapjoin 工作中添加指向小表的 localWork。

    • 可以转换回 MapJoin(有关详细信息,请参见#8)。

  • Auto-SMB join

  • SortedMergeBucketMapJoinProc:将 mapjoin 运算符树转换为 SMBMapJoin 运算符树。将 DummyOp 添加到小型表中。

    • MapJoinFactory:如上所述,在 SMBMapjoin 工作中添加指向小表的 localWork。

    • 可以转换为 MapJoin(有关详细信息,请参见#8)。

  • 转换为 mapjoin 的 SMB 联接

  • SMBJoin 运算符树的构造如上文#6,#7 所述。

    • SortedMergeJoinResolver:对于每个可能的大表候选者,创建一个 mapjoin 工作。这些将具有 LocalWork 数据结构以跟踪小型表。使用所有这些 mapjoin 作品创建 ConditionalWork(原始 SMBJoin 作品作为每个作品的备份任务),而原始 SMBJoin 作品作为最后一个选项。

    • MapJoinResolver:对于创建的每个 mapjoin 作品,均按所述进行最终准备。

Tez Comparison

Hive on Tez 仍在 Developing。他们当前禁用所有逻辑优化器处理器,并使用位于工作生成阶段的名为“ ConvertJoinMapJoin”的处理器。它利用在操作员树上 Comments 的统计信息来决定要采用的联接。它将直接为以下联接创建计划:

  • MapJoin

  • SMBJoin

这些看起来与 MapReduce 计划不同,并且基于 Tez 的物理功能“Broadcast 边缘”。有关更多详细信息,请参见这些连接的 JIRA。

Spark MapJoin

对于 Hive on Spark 的大多数连接,整体执行将与第一次切割的 MR 相似。因此,尽管封装在 SparkWork 中而不是 MapRedWork 中,但是将生成与 MR 中类似的工作树。

区别之一是 mapjoin 的实现,值得花一些时间讨论。回顾 MapReduce 中的 mapjoin 工作树:

  • 运行包含小表操作符树的 MapredLocalWork,并以转储到文件的 HashTableSink op 结尾。这被制成分布式缓存。

  • 为大表运行 MapWork,这将使用 HashTableDummy 的加载器从分布式缓存文件中填充小表哈希图。

Spark mapjoin 可以选择利用更快的 Spark 功能(例如 Broadcast 变量),或使用类似于分布式缓存的功能。 HIVE-7613的“小型 Broadcast”文档中提供了有关选择 MR 样式的分布式缓存的讨论,不过将来可能会添加 Broadcast 变量支持。这是我们想要的计划。

  • 在 Spark 集群上运行小型表 SparkWorks,该集群转储到哈希表文件中(这是 MR 的主要区别,因为小型表工作是分布式的)。

  • 在 Spark 集群上为大表运行 SparkWork。Map 器将使用 HashTableDummy 的加载器从文件中查找小表哈希图。

对于存储桶 Map 联接,每个小表的每个存储桶都进入一个单独的文件,并且大表的每个 Map 器都为每个小表加载对应存储桶的特定存储桶文件。

Spark Join 设计

让我们重新绘制 Hive on Spark 的处理器图。本节还需要注意其他几点:

  • 逻辑优化器大部分是直接或略加修改后从 Mapive 的 Hive 中重复使用的。

  • GenSparkWork 是第一个工作生成器处理器,它创建了 SparkWorks。

与原始 MapReduce 相比,Spark MapJoin 部分中提到的内容还存在一些细微的差异(改进)。

  • Hive on Spark 支持自动存储桶 mapjoin,而 MapReduce 不支持。这通过 SparkMapJoinOptimizer 和 SparkMapJoinResolver 以额外的逻辑完成。

  • 通过将 Sive 的 Hive 转换为 MapJoin,可以简化 Hive,如果符合条件,可以直接转换为 MapJoin。

图 2:在 Spark 上加入 Hive 的处理器

再次,我们首先探索一些有趣的处理器:

  • SparkSortMergeJoinOptimizer:像 SortrtedBucketMapJoinProc 一样,此逻辑优化处理器可处理将公共连接自动转换为 SMB 连接操作符树,以进行进一步处理。

to

  • SparkSortMergeMapJoinFactory:这将使用带有已经带有 SMBMapJoin 运算符和已识别的大/小表的操作员树的 MapWork,并创建一个指向小表的 LocalWork。

to

  • SparkMapJoinProcessor:与 MapJoinProcessor 一样,此逻辑优化处理器会在用户提供提示以识别小表时处理从 common-join 到 mapjoin 的初始转换,以进行进一步处理。最终的运算符树与 MapReduce 略有不同,仍然附加了小表分支的 ReduceSinks。

to

  • SparkMapJoinOptimizer:类似于 MapReduce 的 MapJoinProcessor 和 Tez 的 ConvertJoinMapJoin,这将通过统计信息识别大表和小表,从而将常见的 join 运算符树转换为 mapjoin 运算符树。像 Tez 的处理器一样,它从大表的分支中删除了 redsink,但将其保留给小表。

to

  • GenSparkWork/SparkReduceSinkMapJoinProc:在工作生成阶段,这些处理器的组合将在 mapjoin 运算符树周围绘制适当的工作边界。它还将 ReduceSinks 转换为 HashTableSinks。

to

  • SparkMapJoinResolver:同样,各种 mapjoin 路径(提示和自动)在此最终处理器上会聚,然后在 Spark 集群上执行。这将使用具有 MapJoin 运算符和已识别的大/小表的单个 SparkWork,并将其拆分为相关的 SparkWorks,其中小表 SparkWork 是大表 SparkWork 的父级。这将被发送到 Spark 集群以运行 mapjoin,如“ Spark MapJoin”部分中所述。在依赖的(大表)SparkWork 中创建 LocalWork 数据结构,以包含 HashTableDummy,该 HashTableDummy 加载小表哈希图文件。

to:

以及每个连接计划的处理器路径的摘要,如图 2 所示。

  • 不使用 mapjoin 的编译时偏移连接:逻辑优化器已完全从 MapReduce 中重新使用。

  • SkewJoinOptimizer:此逻辑优化器处理器可重复使用,以通过联合连接在一个 Connecting 创建两个连接计划。

    • 遵循自动转换为 MapJoin 路径。
  • SMB MapJoin(带有提示):同样,逻辑优化器与 MapReduce 中的优化器最相似。

  • SparkMapJoinProcessor/BucketMapJoinOptimizer/SparkSMBJoinHintOptimizer:与 MapReduce 版本几乎相同,它们将运算符树转换为 SMBMapJoinOp,其中包含已识别的大/小表。

    • GenSparkWork:生成 SparkWork,它是植根于大表 TS 的 SMBMapJoin 运算符树。

    • SparkSortMergeJoinFactory:如上所述,在 SMBMapJoin 工作中附加指向小表的 Localwork 数据结构。

  • SMB MapJoin(无提示):同样,逻辑优化器与 MapReduce 中的优化器最相似

  • SparkSortMergeJoinOptimizer:与 MapReduce 版本几乎相同,它将公共联接运算符树转换为 SMB mapjoin 运算符树,并在 SMBMapJoin 运算符上标识了大/小表,如所述。

    • GenSparkWork:生成 SparkWork,它是植根于大表 TS 的 SMBMapJoin 运算符树。

    • SparkSortMergeJoinFactory:如上所述,在 SMBMapJoin 工作中附加指向小表的 Localwork 数据结构。

  • 自动 Mapjoin:主要是重写,无法重用 MapReduce 处理器。

  • SparkMapJoinOptimizer:基于统计信息,将公共联接运算符树转换为 mapjoin 运算符树,并在 MapJoinOp 中标识出大/小表,如所述。

    • GenSparkWork:生成 SparkWork,其中的 MapJoin 运算符树根植于各种表 TS。

    • SparkMapJoinResolver:创建两个 SparkWorks 以实现 mapjoin,如上所述。

  • 通过提示进行 Mapjoin:同样,逻辑优化器与 MapReduce 中的优化器最相似

  • SparkMapJoinProcessor:与 MapReduce 版本几乎相同,这将公共联接运算符树转换为 mapjoin 运算符树,具有在 MapJoinOp 中标识的大/小表,如所述。

    • GenSparkWork:生成 SparkWork,其中的 MapJoin 运算符树根植于各种表 TS。

    • SparkMapJoinResolver:创建两个 SparkWorks 以实现 mapjoin,如上所述。

  • SMB 联接转换为 mapjoin:

  • 与 MapReduce 中不同,避免了此 Route。如果满足条件,则将联接直接发送到 SparkMapJoinOptimizer 和 SparkMapJoinResolver,就像普通的自动 Map 联接一样。

  • 倾斜连接(运行时):

  • SparkSkewJoinResolver:接受具有普通联接的 SparkWork,并将其转换为有条件的工作。然后在条件工作中添加带有 mapjoin 运算符树的其他 SparkWork 作为备份。这些将处理倾斜键。

    • SparkMapJoinResolver:对于具有 mapjoin 的每个备份 SparkWork,创建两个 SparkWorks 以实现 mapjoin,如所述。
  • Auto-bucket join

  • SparkMapJoinOptimizer:除了自动 mapjoin 转换之外,这里还有其他逻辑来支持自动存储桶 mapjoin 转换。

    • SparkMapJoinResolver:除了自动 mapjoin 转换之外,这里还有其他逻辑来支持自动存储桶 mapjoin 转换。