Hive 流 API

从 Hive 3.0.0 版本开始,不推荐使用流式数据提取,而由较新的 V2 API(HIVE-19205)代替。

传统上,向 Hive 中添加新数据需要将大量数据收集到 HDFS 中,然后定期添加新分区。这本质上是“批量插入”。

Hive Streaming API 允许将数据连续抽取到 Hive 中。传入数据可以以小批记录的形式连续提交到现有的 Hive 分区或表中。提交数据后,该数据立即对随后启动的所有 Hive 查询可见。

该 API 适用于连续生成数据的流 Client 端(例如NiFiFlumeStorm)。流支持构建在 Hive 中基于 ACID 的插入/更新支持之上(请参见Hive Transactions)。

Hive 流 API 的类和接口部分大致分为两类。第一组提供对连接和事务 Management 的支持,而第二组提供 I/O 支持。事务由 MetastoreManagement。直接对表定义的目标文件系统(HDFS,S3A 等)执行写操作。

流式传输到未分区表,具有静态分区的分区表和具有动态分区的分区表均受支持。该 API 支持 Kerberos 身份验证和基于存储的授权。调用 API 之前,必须使用 kerberos 登录 Client 端用户。登录的用户应具有适当的存储权限,才能写入目标分区或表位置。建议使用“ hive”用户,以使 hive 查询能够读回数据(由流 API 编写),并且 doAs 设置为 false(以 hive 用户身份运行查询)。

关于包装的注意事项 :API 在 Java 包 org.apache.hive.streaming 中定义,并且是 Hive 中* hive-streaming * Maven 模块的一部分。

Streaming Mutation API 的弃用和删除

从版本 3.0.0 开始,Hive 从 hive-hcatalog-streaming 模块弃用了 Hive,并且不再支持该版本的 Streaming Mutation API。新的配置单元流模块不再支持变异 API。

Streaming Requirements

使用流媒体需要做一些事情。

  • hive-site.xml 中需要以下设置才能启用对流的 ACID 支持:

  • hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

    • hive.compactor.initiator.on = true (请参阅更重要的细节here)

    • hive.compactor.worker.threads > 0

  • *必须在table creation期间指定“存储为 orc” *。目前仅支持ORC 存储格式

  • 创建期间必须在表上设置 tblproperties(“ transactional” =“ true”)。

  • Client 端流处理过程的用户必须具有必要的权限才能写入表或分区并在表中创建分区。

Limitations

当前,流式 API 开箱即用,仅支持流式分隔的 Importing 数据(例如 CSV,制表符分隔等),JSON 和 Regex 格式的数据。 Hive 3.0.0 发行版中支持的记录作者是

StrictDelimitedInputWriter
StrictJsonWriter
StrictRegexWriter

所有这些记录编写者都希望严格的架构匹配,这意味着记录的架构应与表架构完全匹配(注意:编写者不执行架构检查,由 Client 端确保记录架构与表架构匹配) 。

  • RecordWriter *接口的其他实现可以提供对其他 Importing 格式的支持。

当前,目标表的格式仅支持 ORC。

API Usage

事务和连接 Management

HiveStreamingConnection

HiveStreamingConnection 类描述与流连接有关的信息。它描述了数据库,表,分区名称,要连接到的 metastore URI 以及记录写入器,用于将记录流式传输到目标分区或表中。可以通过 Builder API 来指定所有这些信息,然后通过 API 构建与 Hive MetaStore 的连接以进行流传输。在 Builder API 上调用 connect 将返回 StreamingConnection 对象。然后,可以使用 StreamingConnection 启动新事务以执行 I/O。

* CONCURRENCY NOTE: The streaming connection APIs and record writer APIs are not thread-safe. Streaming connection creation, * begin/commit/abort transactions, write and close has to be called in the same thread. If close() or * abortTransaction() has to be triggered from a separate thread it has to be co-ordinated via external variables or * synchronization mechanism

HiveStreamingConnection API 还支持 2 种分区模式(静态与动态)。在静态分区方式下,可以通过 builder API 预先指定分区列的值。如果静态分区存在(由用户预先创建或表中已经存在的分区),则流连接将使用它,如果不存在,则流连接将使用指定的值创建一个新的静态分区。如果对表进行了分区并且未在 builder API 中指定静态分区值,则配置单元流连接将使用动态分区模式,在该模式下配置单元流连接期望分区值是 Logging 的最后一列(类似于配置单元动态分区的工作方式) )。例如,如果表按 2 列(年和月)进行分区,则配置单元流连接将从 ImportingLogging 提取最后 2 列,使用该列将在 metastore 中动态创建分区。

事务的实现与传统数据库系统略有不同。每个 Transaction 都有一个 ID,并且多个 Transaction 被分组为“Transaction 批”。这有助于将来自多个事务的记录分组为更少的文件(而不是每个事务 1 个文件)。在配置单元流连接期间,可以通过构建器 API 指定事务批处理大小。事务 Management 完全隐藏在 API 的后面,在大多数情况下,用户不必担心调整事务批处理大小(这是 maven 级别的设置,将来的发行版中可能不予采用)。如果当前事务批处理已用完,API 也会在 beginTransaction()调用时自动翻转到下一个事务批处理。建议将事务批大小保留为默认值 1,并在每个事务下将几千条记录分组在一起。由于每个事务都对应于文件系统中的增量目录,因此过于频繁地提交事务最终可能会创建太多的小目录。

如果在hive.txn.timeout秒后未提交或异常中止,则 Metastore 最终将使 TransactionBatch 中的事务过期。为了使事务保持活动状态,HiveStreamingConnection 具有一个心跳线线程,默认情况下,该线程在所有打开的事务间隔(hive.txn.timeout/2)之后发送心跳。

有关更多信息,请参见HiveStreamingConnection 的 Javadoc

Usage Guidelines

通常,每个事务中包含的记录越多,可以实现的吞吐量就越高。通常在一定数量的记录之后或一定的时间间隔(以先到者为准)之后提交。后者确保事件流率可变时,事务不会保持打开时间太长。单个事务中可以包含多少数据没有实际限制。唯一的问题是如果事务失败,将需要重播的数据量。 TransactionBatch 的概念用于减少文件系统中由 HiveStreamingConnection API 创建的文件(和增量目录)的数量。由于给定事务批处理中的所有事务都写入同一物理文件(每个存储桶),因此分区只能压缩到包含未清事务的任何批处理中最早事务的级别。因此,TransactionBatches 不应过大。包括一个计时器以在一段时间后关闭 TransactionBatch(即使它有未使用的事务)是有意义的。

HiveStreamingConnection 已针对写吞吐量(Delta 流优化)进行了高度优化,因此,Hive 流摄取生成的增量文件已禁用了许多 ORC 功能(字典编码,索引,压缩等),以促进高吞吐量写。当压缩程序启动时,这些增量文件将被重写为读取和存储优化的 ORC 格式(启用字典编码,索引和压缩)。因此,建议更积极/更频繁地配置压缩器(请参阅Compactor)以生成压缩和优化的 ORC 文件。

有关 HiveConf 对象的 Comments

HiveStreamingConnect 构建器 API 接受 HiveConf 参数。可以将其设置为 null,也可以提供预先创建的 HiveConf 对象。如果为 null,则将在内部创建一个 HiveConf 对象并将其用于连接。实例化 HiveConf 对象时,如果包含 hive-site.xml 的目录是 javaClasspath 的一部分,则将使用其中的值初始化 HiveConf 对象。如果未找到 hive-site.xml,则将使用默认值初始化该对象。如果频繁打开连接(例如每秒几次),则预先创建该对象并在多个 Connecting 重用它可能会对性能产生显着影响。安全连接依赖于在 HiveConf 对象中正确设置的“ metastore.kerberos.principal”。

无论在 hive-site.xml 或自定义 HiveConf 中设置了什么值,API 都会在内部覆盖其中的某些设置以确保正确的流传输行为。以下是被覆盖的设置列表:

  • hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

  • hive.support.concurrency = true

  • hive.metastore.execute.setugi = true

  • hive.exec.dynamic.partition.mode =非严格

  • hive.exec.orc.delta.streaming.optimizations.enabled = true

  • hive.metastore.client.cache.enabled =否

I/O –写入数据

这些类和接口提供了在事务内将数据写入 Hive 的支持。

RecordWriter

RecordWriter 是所有 Writer 实现的基本接口。 Writer 负责以 byte 的形式获取记录,其中包含已知格式(例如 CSV)的数据,并以 Hive 流支持的格式写出。具有严格实现的 RecordWriter 期望记录架构与表架构完全匹配。在动态分区模式下写的 RecordWriter 期望分区列是每个 Logging 的最后一列。如果分区列的值为空或为空,则记录将进入__HIVE_DEFAULT_PARTITION__。流 Client 端将实例化适当的 RecordWriter 类型,并将其传递给 HiveStreamingConnection 构建器 API。此后,流 Client 端不直接与 RecordWriter 交互。之后,StreamingConnection 对象将使用和 ManagementRecordWriter 实例来执行 I/O。有关详情,请参见Javadoc

RecordWriter 的主要功能是:

  • 修改 Importing 记录:如果它们没有对应的表列,则可能会从 Importing 数据中删除字段;如果某些列缺少字段,则添加空值;如果分区列的值为空或为空,则添加__HIVE_DEFAULT_PARTITION__。动态创建分区需要了解传入的数据格式以提取最后一列以提取分区值。

  • 对修改后的记录进行编码:编码涉及使用适当的Hive SerDe进行序列化。

  • 对于存储桶的表,从 Logging 提取存储桶列的值以标识记录所属的存储桶。

  • 对于分区表,在动态分区模式下,从记录的最后 N 列(其中 N 为分区数)中提取分区列的值,以标识记录所属的分区。

  • 使用AcidOutputFormat的记录更新程序将编码的记录写入 Hive,以用于适当的存储桶。

StrictDelimitedInputWriter

StrictDelimitedInputWriter 类实现 RecordWriter 接口。它接受定界格式(例如 CSV)的 Importing 记录,并将其写入 Hive。它期望记录模式与表模式匹配,并期望最后一个分区值。使用 LazySimpleSerde 将 Importing 记录转换为 Object,以提取存储桶和分区列,然后将其传递到相应存储桶的基础 AcidOutputFormat 的记录更新器。参见Javadoc

StrictJsonWriter

StrictJsonWriter 类实现 RecordWriter 接口。它接受严格的 JSON 格式的 Importing 记录,并将其写入 Hive。它将使用 JsonSerde 将 JSON 记录直接转换为 Object,然后将其传递到相应的存储桶和分区的基础 AcidOutputFormat 的记录更新器。参见Javadoc

StrictRegexWriter

StrictRegexWriter 类实现 RecordWriter 接口。它接受 Importing 记录,文本格式的正则表达式并将其写入 Hive。它使用适当的正则表达式将文本记录直接转换为使用 RegexSerDe 的对象,然后将其传递给相应存储桶的基础 AcidOutputFormat 的记录更新器。参见Javadoc

AbstractRecordWriter

这是一个 Base Class,其中包含 RecordWriter 对象所需的一些通用代码,例如架构查找以及计算记录应属于的存储桶和分区。

Error Handling

为了使系统正常运行,此 API 的 Client 端必须正确处理错误。该 API 仅返回 StreamingException,但是在不同情况下会抛出 StreamingException 的多个子类。

ConnectionError-无法构建与 Metastore 的连接或使用 HiveStreamingConnection connect API 的方式不正确时

InvalidTable-当目标表不存在或不是 ACID 事务表时抛出

InvalidTransactionState-当事务批处理变为无效状态时抛出

SerializationError-当 SerDe 在序列化/反序列化/记录写入过程中引发任何异常时抛出

StreamingIOFailure-如果无法创建目标分区,记录更新程序在写入或刷新记录期间抛出任何 IO 错误,则抛出该异常。

TransactionError-无法提交或中止内部事务时抛出。

由 Client 端决定可以重试(通常有一些退避),忽略或重新抛出哪些异常。通常,可以使用指数退避重试与连接相关的异常。可以引发或忽略与序列化相关的错误(如果某些传入记录不正确/损坏并且可以删除)。

Example

///// Stream five records in two transactions /////
 
// Assumed HIVE table Schema:
create table alerts ( id int , msg string )
     partitioned by (continent string, country string)
     clustered by (id) into 5 buckets
     stored as orc tblproperties("transactional"="true"); // currently ORC is required for streaming
 
 
//-------   MAIN THREAD  ------- //
String dbName = "testing";
String tblName = "alerts";

.. spin up thread 1 ..
// static partition values
ArrayList<String> partitionVals = new ArrayList<String>(2);
partitionVals.add("Asia");
partitionVals.add("India");

// create delimited record writer whose schema exactly matches table schema
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
                                      .withFieldDelimiter(',')
                                      .build();
// create and open streaming connection (default.src table has to exist already)
StreamingConnection connection = HiveStreamingConnection.newBuilder()
                                    .withDatabase(dbName)
                                    .withTable(tblName)
                                    .withStaticPartitionValues(partitionVals)
                                    .withAgentInfo("example-agent-1")
                                    .withRecordWriter(writer)
                                    .withHiveConf(hiveConf)
                                    .connect();
// begin a transaction, write records and commit 1st transaction
connection.beginTransaction();
connection.write("1,val1".getBytes());
connection.write("2,val2".getBytes());
connection.commitTransaction();
// begin another transaction, write more records and commit 2nd transaction
connection.beginTransaction();
connection.write("3,val3".getBytes());
connection.write("4,val4".getBytes());
connection.commitTransaction();
// close the streaming connection
connection.close();

.. spin up thread 2 ..
// dynamic partitioning
// create delimited record writer whose schema exactly matches table schema
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
                                      .withFieldDelimiter(',')
                                      .build();
// create and open streaming connection (default.src table has to exist already)
StreamingConnection connection = HiveStreamingConnection.newBuilder()
                                    .withDatabase(dbName)
                                    .withTable(tblName)
                                    .withAgentInfo("example-agent-1")
                                    .withRecordWriter(writer)
                                    .withHiveConf(hiveConf)
                                    .connect();
// begin a transaction, write records and commit 1st transaction
connection.beginTransaction();
// dynamic partition mode where last 2 columns are partition values
connection.write("11,val11,Asia,China".getBytes());
connection.write("12,val12,Asia,India".getBytes());
connection.commitTransaction();
// begin another transaction, write more records and commit 2nd transaction
connection.beginTransaction();
connection.write("13,val13,Europe,Germany".getBytes());
connection.write("14,val14,Asia,India".getBytes());
connection.commitTransaction();
// close the streaming connection
connection.close();