apache-hive / 3.1.1 / reference / Hive_on_Spark.html

1. Introduction

我们建议修改 Hive 以将 Spark 添加为与 MapReduce 和 TezParallel 的第三个执行后端(HIVE-7292)。

Spark 是一个开源数据分析集群计算框架,它构建在 Hadoop 的两阶段 MapReduce 范例之外,但位于 HDFS 之上。 Spark 的主要抽象是称为“弹性分布式数据集(RDD)”的项目的分布式集合。可以从 Hadoop InputFormats(例如 HDFS 文件)或通过转换其他 RDD 创建 RDD。通过应用诸如 groupBy 和 filter 之类的一系列转换,或 Spark 提供的诸如 count 和 save 之类的操作,可以对 RDD 进行处理和分析,以实现 MapReduce 作业无需中间阶段即可完成的工作。

如 Shark 和 Spark SQL 所示,SQL 查询可以轻松转换为 Spark 转换和操作。实际上,许多原始转换和操作都是面向 SQL 的,例如连接和计数。

有关 Spark 的更多信息可以在这里找到:

1.1 Motivation

以下是使 Hive 在 Spark 上运行的主要动机:

  • Spark 用户的好处:对于已经将 Spark 用于其他数据处理和机器学习需求的用户而言,此功能非常有价值。在一个执行后端上进行标准化便于操作 Management,并使开发 maven 知识以调试问题和进行增强变得更加容易。

  • 更好地采用 Hive:遵循上一点,这将 Hive 作为 SQL on Hadoop 选项带入 Spark 用户群,进一步提高了 Hive 的采用率。

  • 性能:Hive 查询(尤其是涉及多个 reducer 阶段的查询)将运行得更快,从而像 Tez 一样改善用户体验。

Spark 执行后端取代 Tez 或 MapReduce 不是目标。对于 Hive 项目而言,多个后端共存是健康的。用户可以选择使用 Tez,Spark 还是 MapReduce。根据使用情况,每种都有不同的优势。 Hive 的成功并不完全取决于 Tez 或 Spark 的成功。

1.2 设计原则

主要的设计原理是对 Hive 的现有代码路径没有影响或只有有限的影响,因此对功能或性能没有影响。也就是说,选择在 MapReduce 或 Tez 上运行 Hive 的用户将拥有与现在一样的现有功能和代码路径。另外,在执行层插入 Spark 可以最大程度地共享代码并包含维护成本,因此 Hive 社区无需为 Spark 进行专门的投资。

同时,选择 Spark 作为执行引擎的用户将自动具有 Hive 提供的所有丰富功能。那些添加到 Hive 的将来功能(例如新数据类型,UDF,逻辑优化等)应该自动提供给那些用户,而无需在 Hive 的 Spark 执行引擎中完成任何自定义工作。

1.3 与 Shark 和 Spark SQL 的比较

Spark 生态系统中有两个相关项目可在 Spark 上提供 Hive QL 支持:Shark 和 Spark SQL。

  • Shark 项目将 Hive 生成的查询计划转换为其自己的表示形式,并通过 Spark 执行它们。

  • Spark SQL 是 Spark 中的功能。它使用 Hive 的解析器作为前端,以提供 Hive QL 支持。 Spark 应用程序开发人员可以轻松地用 SQL 以及其他 Spark 运算符在其代码中表达其数据处理逻辑。 Spark SQL 支持与 Hive 不同的用例。

与 Shark 和 Spark SQL 相比,我们的设计方法支持所有现有 Hive 功能,包括 Hive QL(以及将来的任何扩展),以及 Hive 与授权,监视,审计和其他操作工具的集成。

1.4 其他注意事项

我们知道,一个新的执行后端是一项重大任务。即使设计避免接触现有的代码路径,它也不可避免地增加了复杂性和维护成本。 Hive 现在将针对 MapReduce,Tez 和 Spark 运行单元测试。我们认为收益大于成本。从基础结构的角度来看,我们可以获得更多硬件的赞助,以进行持续集成。

最后,Hive on Tez 奠定了重要的基础,这将对支持新的执行引擎(例如 Spark)非常有帮助。这里的这个项目一定会从中受益。另一方面,Spark 是一个与 MapReduce 或 Tez 完全不同的框架。因此,在集成过程中很可能会发现差距和障碍。期望 Hive 社区将与 Spark 社区紧密合作,以确保集成成功。

2.高级功能

2.1 新的执行引擎

除了现有的 MapReduce 和 Tez,我们还将引入一个新的执行方式 Spark。要将 Spark 用作 Hive 中的执行引擎,请进行以下设置:

set hive.execution.engine=spark;

此配置的默认值仍为“ mr”。 Hivecontinue 在 MapReduce 和 Tez 上工作,就像在没有火花的群集上一样。

新的执行引擎应支持所有 Hive 查询,而无需对查询进行任何修改。查询结果在功能上应与 MapReduce 或 Tez 中的结果相同。

2.2 Spark 配置

当将 Spark 配置为 Hive 的执行时,将引入一些配置变量,例如 Spark 集群的主 URL。但是,如果未将 Spark 配置为执行引擎,则可以完全忽略它们。

2.3 其他功能

  • Hive 将显示一个任务执行计划,该计划类似于 MapReduce 和 Tez 的“解释”命令中显示的计划。

  • 当在 Spark 上运行查询时,Hive 将向用户提供有关查询进度和完成状态的适当反馈。

  • 用户将能够像以前一样获得统计信息和诊断信息(控制台上的计数器,日志和调试信息)。

3.配置单元设计

如引言中所述,该项目采用与 Shark 或 Spark SQL 不同的方法,因为我们不会使用 Spark 的 Primitives 来实现 SQL 语义。相反,我们将使用 MapReducePrimitives 实现它。这里唯一的新事物是这些 MapReducePrimitives 将在 Spark 中执行。实际上,在此设计中将仅使用 Spark 的一些 Primitives。

与 Shark 或 Spark SQL 所执行的操作不同,在 Spark 上执行 Hive 的 MapReducePrimitives 的方法具有以下直接优点:

  • Spark 用户将自动获得 Hive 的全部丰富功能,包括 Hive 将来可能会引入的任何新功能。

  • 这种方法避免或减少了 Hive Spark 执行引擎中任何自定义工作的必要性。

  • 通过使 Hive-on-Spark 与 Hive MapReduce 和 Tez 保持一致,这也将限制项目的范围并减少长期维护。

为 Hive 实施 Spark 执行引擎的主要工作有两个方面:查询计划,其中语义分析器中的 Hive 运算符计划进一步翻译成 Spark 可以执行的任务计划;查询执行,其中生成的 Spark 计划实际在其中执行。 Spark 集群。当然,还有其他功能部件,这些部件杂乱而必不可少,例如监视,计数器,统计信息等。因此,下面还概述了一些重要的设计细节。

值得注意的是,尽管 Spark 主要是用 Scala 编写的,但它提供了多种语言(包括 Java)的 Client 端 API。当然,我们选择 Spark Java API 进行集成,并且该项目不需要 Scala 知识。

3.1 查询计划

当前,Hive 语义分析器会针对给定的用户查询生成由逻辑运算符图(如 TableScanOperator,ReduceSink,FileSink,GroupByOperator 等)组成的运算符计划。合理的运营计划。 Tez 的行为类似,但是会生成一个 TezTask,它将其他多个 MapReduce 任务组合为一个 Tez 任务。

对于 Spark,我们将引入与 MapReduceCompiler 和 TezCompiler 并行的 SparkCompiler。它的主要职责是从 Hive 逻辑操作员计划中编译可以在 Spark 上执行的计划。因此,我们将使用 SparkTask 来描述将在 Spark 集群中执行的作业,并使用 SparkWork 来描述 Spark 任务的计划。因此,SparkCompiler 将 Hive 的操作员计划转换为 SparkWork 实例。

在任务计划生成期间,SparkCompiler 可能会执行适合 Spark 的物理优化。但是,在实施的第一阶段,除非简单易懂,否则我们将重点放在此方面。随着我们获得越来越多的 Spark 知识和经验,可以逐步进行进一步的优化。

如何从 Hive 的运算符计划生成 SparkWork 留待实施。但是,在 Tez 和 Spark 之间以及 MapReduce 和 Spark 之间似乎有很多共同的逻辑。如果可行,我们将提取通用逻辑并将其打包为可共享的形式,将特定的实现留给每个任务编译器使用,而不会破坏 MapReduce 或 Tez 的稳定性。

3.2 作业执行

Hive 的任务执行框架可以像执行其他任务一样执行 SparkTask 实例。在内部,SparkTask.execute()方法将从 SparkWork 实例中生成 RDD 和函数,然后通过 SparkClient 端将执行提交给 Spark 集群。

将 Spark 工作提交到 Spark 集群后,SparkClient 端将 continue 监视作业执行并报告进度。可以通过 SparkListener API 监视 Spark 作业。目前在 Spark Java API 中尚不可用,我们希望在 Spark 社区的帮助下很快可以使用它们。

使用 SparkListener API,我们将添加一个 SparkJobMonitor 类,该类处理状态的打印以及报告最终结果。此类提供的功能类似于用于 MapReduce 处理的 HadoopJobExecHelper 或用于 Tez 作业处理的 TezJobMonitor,并且在作业失败的情况下,还将检索并打印在执行时抛出的顶级异常。

Spark 作业提交是通过 SparkContext 对象完成的,该对象使用用户的配置实例化。当 Hive 执行 SparkTask 时,将在当前用户会话中创建此类上下文对象。使用上下文对象,创建了与 Hive 表相对应的 RDD,并从 Hive 的 SparkWork 构建并应用于 RDD 的 MapFunction 和 ReduceFunction(在下文中有更多详细信息)。通过使用伪函数在 RDD 上应用 foreach()转换来触发作业执行。

每个用户会话一个 SparkContext 是正确的做法,但是由于某些线程安全问题,Spark 似乎为每个应用程序假定一个 SparkContext。我们希望 Spark 社区能够及时解决此问题。

3.3 设计注意事项

本节涵盖了许多重要组件的主要设计注意事项,这些组件将要推出,也可能需要进行特殊处理。对于未列出的其他现有组件,例如 UDF 和自定义 Serdes,我们希望不需要或无需考虑特殊的因素。

表为 RDD

Hive 表不过是 HDFS 上的一堆文件和文件夹。 Spark 基元应用于 RDD。因此,自然的 Hive 表在 Spark 执行引擎中将被视为 RDD。但是,Hive 表比 HDFS 文件更复杂。它可以具有分区和存储桶,以处理异构的 Importing 格式和模式演变。结果,治疗可能不是那么简单,可能会有并发症,这是我们需要注意的。

我们可能需要扩展 Spark 的 Hadoop RDD 并实现特定于 Hive 的 RDD。尽管在 Scala 中 RDD 扩展似乎很容易,但是由于 Spark 的 Java API 缺乏这种功能,这可能是一个挑战。我们将找出是否需要 RDD 扩展,如果需要,我们将需要 Spark 社区的 Java API 帮助。

SparkWork

如上所述,SparkTask 将使用 SparkWork,它描述了 Spark 作业将要执行的任务计划。 SparkWork 将与 TezWork 非常相似,后者基本上由叶子处的 MapWork 和所有其他节点中的 ReduceWork(有时是 UnionWork)组成。

通过 MapWork 和 ReduceWork 定义 SparkWork 可以使新概念更容易理解。 “解释”命令将显示 Hive 用户熟悉的模式。

SparkTask

要执行 SparkWork 实例描述的工作,需要进一步转换,因为 MapWork 和 ReduceWork 是面向 MapReduce 的概念,而使用 Spark 实施它们需要遍历计划并生成 Spark 构造(RDD,函数)。如何遍历和转换计划留给实现,但这是 Spark 特有的,因此不会暴露或影响其他组件。

上面提到的 MapFunction 将由 MapWork 生成,特别是从 ExecMapper.map()方法开始的运算符链。 ExecMapper 类实现 MapReduce Mapper 接口,但 Hive 中的实现包含一些可重复用于 Spark 的代码。因此,我们可能会将通用代码提取到一个单独的类 MapperDriver 中,以供 MapReduce 和 Spark 共享。请注意,这只是重构而不是重新设计的问题。

(Tez 可能有相同的情况.但是,Tez 选择创建一个单独的类 RecordProcessor 来执行类似的操作.)

同样,ReduceFunction 将由 SparkWork 中的 ReduceWork 实例组成。对 Spark 而言,ReduceFunction 与 MapFunction 没有什么区别,但是该函数的实现将有所不同,由从 ExecReducer.reduce()开始的运算符链组成。同样,由于 ExecReducer 中的某些代码将被重用,因此我们很可能会将通用代码提取到一个单独的类 ReducerDriver 中,以便由 MapReduce 和 Spark 共享。

所有功能(包括 MapFunction 和 ReduceFunction)都需要可序列化,因为 Spark 需要将它们运送到集群。这可能很棘手,因为如何打包功能会影响功能的序列化,而 Spark 对此是隐式的。

请注意,Spark 的内置 Map 和归约转换运算符对每个记录都起作用。例如,Hive 的运算符需要在调用以处理行之前进行初始化,并在完成处理后关闭。 MapFunction 和 ReduceFunction 将必须在单个 call()方法中执行所有这些操作。为了将 Spark 用作 Hive 的备用执行后端,我们将在 RDD 上使用 mapPartitions 转换运算符,该运算符在整个数据分区上提供迭代器。通过控制迭代器,Hive 可以在处理第一行之前初始化运算符链,并在使用完所有 Importing 后取消初始化它。

值得注意的是,在原型制作期间,Spark 缓存在某些情况下会全局起作用,从而保持该函数的陈旧状态。这样的罪魁祸首很难被发现,希望 Spark 在记录功能方面会更加具体。

随机,分组和排序

虽然这对于 MapReduce 和 Tez 是“免费”的,但我们将需要为 Spark 提供等效的功能。幸运的是,Spark 提供了一些适合替代 MapReduce 随机播放功能的转换,例如 partitionBy,groupByKey 和 sortByKey。转换 partitionBy 进行纯混洗(不进行分组或排序),groupByKey 进行混洗和分组,sortByKey()进行混洗和排序。因此,对于 SparkWork 中的每个 ReduceSinkOperator,我们将需要注入其中一种转换。

具有选择性地选择准确的混洗行为的能力为优化提供了机会。例如,Hive 的 groupBy 不需要对键进行排序,但是 MapReduce 仍然可以对其进行排序。在 Spark 中,仅当必要的键 Sequences 很重要时(例如,对于 SQL order by),我们才可以选择 sortByKey。

虽然 sortByKey 不提供分组功能,但是将键分组很容易,因为具有相同键的行将连续出现。另一方面,groupByKey 将键聚集在一个集合中,这自然适合 MapReduce 的 reducer 接口。

由于 Hive 在使用 MapReduce 键来实现无法直接使用的操作(例如连接)方面更为复杂,因此上述转换可能无法完全满足 Hive 的需求。因此,在前进的过程中,我们需要勤于发现潜在的问题。

最后,Spark 社区似乎正在改进/更改与改组相关的 API。因此,这部分设计可能会发生变化。有关 Spark 随机播放相关改进的详细信息,请参阅https://issues.apache.org/jira/browse/SPARK-2044

Join

正如在 Hive 中所体现的那样,在 MapReduce 世界中实现连接相当复杂。 Hive 具有减少侧连接以及 Map 侧连接(包括 Map 侧哈希查找和 Map 侧排序合并)。我们将保留 Hive 的 join 实现。但是,由于 Hive 在实现 reduce-side join 中广泛使用 MapReduce 的改组,因此需要特别注意改组行为(键生成,分区,排序等)。如上一节(随机,分组和排序)所指出的,Spark 有望或将能够提供对改组的灵活控制。

有关详细设计,请参见:Hive on Spark:加入设计大师

任务数

如上所述,Spark 转换(例如 partitionBy)将用于将 mapper 端的操作连接到 reducer 端的操作。可以选择为这些转换指定分区的数量,这基本上决定了 reducer 的数量。

减速器数量的确定与 MapReduce 和 Tez 的确定相同。

本地 MapReduce 任务

虽然我们可以看到在 Spark 上运行本地作业的好处,例如避免将数据下沉到文件中,然后再将其从文件中读取到内存中,但在短期内,这些任务仍将以今天的方式执行。这意味着 Hive 在本地执行时始终必须提交 MapReduce 作业。但是,可以对此进行进一步的调查和评估。

向用户呈现查询结果也是如此。当前,在 Client 端使用提取操作符从临时文件(由查询计划中的 FileSink 生成)中提取行。可以让 FileSink 生成内存中的 RDD,而 fetch 运算符可以直接从 RDD 中读取行。同样,这可以作为 Future 的工作进行研究和实施。

语义分析和逻辑优化

语义分析器和任何逻辑优化都不会改变。物理优化和 MapReduce 计划生成已作为 Hive on Tez 工作的一部分移至单独的类中。

Job Diagnostics

基本的“作业成功/失败”以及进度将在“作业监视”中讨论。 Hive 当前尝试获取有关失败作业的其他信息的方法可能无法立即获得,但这是另一个需要更多研究的领域。

Spark 在运行时为每个 SparkContext 提供 WebUI。请注意,默认情况下,此信息仅在应用程序期间有效。要在事后查看 Web UI,请在启动应用程序之前将 spark.eventLog.enabled 设置为 true。这会将 Spark 配置为记录 Spark 事件,这些事件将 UI 中显示的信息编码到持久存储中。

Spark 的独立模式群集 Management 器还具有自己的 Web UI。如果应用程序在其整个生命周期中都记录了事件,则独立主服务器的 Web UI 将在应用程序完成后自动重新呈现应用程序的 UI。

如果 Spark 在 Mesos 或 YARN 上运行,则只要存在应用程序的事件日志,仍然可以通过 Spark 的历史记录服务器重构完成的应用程序的 UI。

有关 Spark 监视的更多信息,请访问http://spark.apache.org/docs/latest/monitoring.html

计数器和 Metrics

Spark 具有累加器,这些累加器只是通过关联操作“添加”到的变量,因此可以有效地并行支持。它们可用于实现计数器(如在 MapReduce 中)或总和。 Spark 本身支持数值类型和标准可变集合的累加器,程序员可以添加对新类型的支持。在 Hive 中,我们可以使用 Spark 累加器来实现 Hadoop 计数器,但是这样做的方式可能不正确。

Spark 为正在运行的作业发布运行时 Metrics。但是,度量很可能不同于 MapReduce 或 Tez,更不用说提取度量的方式了。围绕此的主题值得单独的文档,但可以肯定地可以逐步改进。

Explain Statements

说明语句将与 TezWork 相似。

Hive Variables

配置单元变量将 continue 保持今天的状态。变量将像以前一样传递给执行引擎。但是,某些与执行引擎相关的变量可能不适用于 Spark,在这种情况下,它们将被忽略。

Union

尽管上面提到我们将使用 MapReducePrimitives 在 Spark 执行引擎中实现 SQL 语义,但联合是一个 exception。虽然可以使用 MapReducePrimitives 实现它,但最多需要三个 MapReduce 作业才能合并两个数据集。使用 Spark 的并集转换应大大减少执行时间并促进交互性。

实际上,Tez 在 union 方面已经偏离了 MapReduce 的实践。现有一个 UnionWork,其中将 unions 操作员转换为工作单位。

并发和线程安全

Spark 启动 Map 器和精简器与 MapReduce 的不同之处在于,工作者可以在单个 JVM 中处理多个 HDFS 拆分。但是,Hive 的 map-side 运算符树或 reduce-side 运算符树在专用 JVM 中的单个线程中运行。重用运算符树并将它们彼此放置在共享的 JVM 中将很可能引起并发和线程安全问题。这样的问题,例如静态变量,已在初始原型制作中浮出水面。例如,变量 ExecMapper.done 用于确定 Map 器是否已完成工作。如果单个 JVM 中存在两个 ExecMapper 实例,则较早完成的一个 Map 器也会过早终止另一个 Map 器。我们希望将有大量工作使这些操作员树成为线程安全和无争用的。但是,这项工作不会对其他执行引擎产生任何影响。

Build Infrastructure

对 Spark 会有一个新的“ ql”依赖项。当前,SparkClient 端库位于单个 jar 中。 Spark jar 的处理方式与 Hadoop jar 的处理方式相同:它们将在编译期间使用,但不包括在最终发行版中。相反,我们将依赖于它们分别安装。只需运行 Spark 罐即可运行 Spark 作业,而 MapReduce 或 Tez 执行则不需要它们。

另一方面,要在 Spark 上运行 Hive 代码,需要通过调用 SparkContext.addJar()方法将某些 Hive 库及其依赖项分发到 Spark 集群。由于 Spark 还依赖于 Hadoop 和其他库,它们可能会出现在 Hive 的依存关系中,但版本不同,因此在识别和解决库冲突方面可能会遇到一些挑战。JettyLibrary 在原型制作过程中提出了这样的挑战。

Mini Spark 群集

通过将“ local”作为主 URL,可以在本地运行 Spark 作业。大多数测试将在此模式下执行。同时,Spark 提供了一种在本地集群中运行作业的方法,本地集群是由本地计算机中给定数量的进程组成的集群。我们将进一步确定这是否是运行 Hive 与 Spark 相关的测试的好方法。

Testing

测试(包括预提交测试)与 Tez 相同。当前,Hive 存在覆盖问题,因为有一些变量需要完整的回归套件运行,例如 Tez 与 MapReduce,矢量化与关闭等。我们建议在预提交测试运行中轮换这些变量,以便有足够的覆盖范围测试时间不会延长。

3.4 Spark 可能需要进行的工作

在整个原型设计过程中,已经确定了一些关于 Spark 的问题。可能更多,但以下是 Spark 社区需要对该项目进行的改进的摘要:

  • Java 中的作业监视 API。

  • SparkContext 线程安全问题。

  • 改善随机播放功能和 API。

  • 潜在地,用于扩展 RDD 的 Java API。

4. Summary

从上面的分析可以看出,Spark on Hive 的项目在功能和设计方面既简单又干净,而又复杂且涉及实施,可能会花费大量时间和资源。因此,我们将采用分阶段的方法,并期望优化和改进的工作将在相对较长的时间内进行,而所有基本功能都将在第一阶段存在。

其次,我们期望 Hive 和 Spark 之间的整合不会一直很顺利。可能会发现功能上的差距,并且可能会出现问题。我们期望 Hive 社区和 Spark 社区将紧密合作以解决可能遇到的任何障碍。

尽管如此,我们认为对现有代码路径的影响很小。虽然 Spark 执行引擎可能需要一些时间才能稳定下来,但 MapReduce 和 Tez 应该 continue 按原样工作。