On this page
Hive 3 Streaming API
Hive 3 Streaming API 文档-Hive 3 中提供了新的 API
Hive HCatalog 流 API
传统上,向 Hive 中添加新数据需要将大量数据收集到 HDFS 中,然后定期添加新分区。这本质上是“批量插入”。不允许将新数据插入现有分区。 Hive Streaming API 允许将数据连续抽取到 Hive 中。传入数据可以以小批记录的形式连续提交到现有的 Hive 分区或表中。提交数据后,该数据立即对随后启动的所有 Hive 查询可见。
该 API 适用于持续生成数据的流 Client 端(例如Flume和Storm)。流支持构建在 Hive 中基于 ACID 的插入/更新支持之上(请参阅Hive Transactions)。
Hive 流 API 的类和接口部分大致分为两类。第一组提供对连接和事务 Management 的支持,而第二组提供 I/O 支持。事务由 MetastoreManagement。写操作直接执行到 HDFS。
还支持流式传输到“未分区”表。该 API 支持从Hive 0.14开始的 Kerberos 身份验证。
关于包装的说明 :这些 API 在 Java 包 org.apache.hive.hcatalog.streaming 中定义,并且是 Hive 中的* hive-hcatalog-streaming * Maven 模块的一部分。
流变异 API
从 2.0.0 版本开始,Hive 提供了另一个 API,可以使用 Hive 的 ACID 功能将记录突变(插入/更新/删除)到事务表中。有关详细信息以及与本文档中描述的流数据摄取 API 的比较,请参见HCatalog 流式突变 API。
Streaming Requirements
使用流媒体需要做一些事情。
hive-site.xml 中需要以下设置才能启用对流的 ACID 支持:
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on = true (请参阅更重要的细节here)
hive.compactor.worker.threads > 0
*必须在table creation期间指定“存储为 orc” *。目前仅支持ORC 存储格式。
创建期间必须在表上设置 tblproperties(“ transactional” =“ true”)。
Hive 表必须为bucketed,但不能排序。因此,在创建表期间必须指定诸如“由(colName)群集为* 10 *个存储桶”之类的内容。理想情况下,存储桶的数量与流式写入器的数量相同。
Client 端流处理过程的用户必须具有必要的权限才能写入表或分区并在表中创建分区。
(临时要求) 在流表上发出查询时 ,Client 端需要设置
hive.vectorized.execution.enabled 为 false (对于 Hive 版本<0.14.0)
- hive.input.format 到 org.apache.hadoop.hive.ql.io.HiveInputFormat
Limitations
当前,流式 API 开箱即用,仅支持流式分隔的 Importing 数据(例如 CSV,制表符分隔等)和 JSON(严格语法)格式的数据。 * RecordWriter *接口的其他实现可以提供对其他 Importing 格式的支持。
当前,目标表的格式仅支持 ORC。
API Usage
事务和连接 Management
HiveEndPoint
HiveEndPoint 类描述了要连接的 Hive 端点。这描述了数据库,表和分区的名称。对其调用 newConnection 方法可构建与 Hive MetaStore 的连接,以进行流传输。它返回一个 StreamingConnection 对象。可以在同一端点上构建多个连接。然后,可以使用 StreamingConnection 启动新事务以执行 I/O。
在数据连续流传输的设置中,很有可能会定期将数据添加到新分区中。 HiveManagement 员可以预先创建必要的分区,或者流 Client 端可以根据需要创建它们。 HiveEndPoint.newConnection()接受一个布尔参数,以指示是否应自动创建分区。分区创建是一个原子动作,多个 Client 端可以争用创建分区,但是只有一个成功,因此流 Client 端在创建分区时不必同步。
事务的实现与传统数据库系统略有不同。每个 Transaction 都有一个 ID,并且多个 Transaction 被分组为“Transaction 批”。这有助于将来自多个事务的记录分组为更少的文件(而不是每个事务 1 个文件)。连接后,流 Client 端首先请求新一批事务。作为响应,它接收到一组作为 Transaction 批处理的一部分的 TransactionID。随后,Client 通过发起新 Transaction 来一次消耗一个 TransactionID。Client 端将为每个事务写入()一个或多个记录,并在切换到下一个之前提交或中止当前事务。每次 TransactionBatch.write()调用都会自动将 I/O 尝试与当前 Txn ID 相关联。流 Client 端进程的用户需要对分区或表具有写权限。需要基于 Kerberos 的身份验证才能以特定用户身份获取连接。请参阅下面的安全流传输示例。
并发注意: I/O 可以同时在多个 TransactionBatch 上执行。但是,必须按 Sequences 使用事务批中的事务。
有关更多信息,请参见HiveEndPoint 的 Javadoc。通常,用户将使用 HiveEndPoint 对象构建目标信息,然后调用* newConnection *构建连接并获取 StreamingConnection 对象。
StreamingConnection
StreamingConnection 类用于获取批量 Transaction。一旦 HiveEndPoint 提供了连接,应用程序通常将进入一个循环,在该循环中它将调用* fetchTransactionBatch 并编写一系列事务。关闭时,应用程序应调用 close *。有关更多信息,请参见Javadoc。
TransactionBatch
TransactionBatch 用于编写一系列事务。每个存储区中的每个 TxnBatch 都在 HDFS 上创建了一个文件。 API 会检查每条记录,以确定其属于哪个存储桶,并将其写入相应的存储桶。如果表有 5 个存储桶,则 TxnBatch 将有 5 个文件(其中一些可能为空)(在开始压缩之前)。在Hive 1.3.0之前,API 的存储桶计算逻辑中的错误导致错误地将记录分配到存储桶中,这可能导致使用存储桶联接算法从查询返回的数据不正确。
对于 TxnBatch 中的每个事务,应用程序将分别调用* beginNextTransaction , write 和 commit 或 abort *。有关详情,请参见Javadoc。事务不能包含来自多个分区的数据。
如果在hive.txn.timeout秒后未提交或异常中止,则 Metastore 最终将使 TransactionBatch 中的事务过期。 TrasnactionBatch 类提供 heartbeat() 方法以延长批次中未使用事务的寿命。一个好的经验法则是在创建 TransactionBatch 之后以(hive.txn.timeout/2)的间隔发送调用 heartbeat()。这足以使非活动事务保持活动状态,但又不会不必要地加载元存储。
Usage Guidelines
通常,每个事务中包含的事件越多,可以实现的吞吐量就越高。它是在一定数量的事件之后或一定的时间间隔之后(以先到者为准)的常见提交。后者确保事件流率可变时,事务不会保持打开时间太长。单个事务中可以包含多少数据没有实际限制。唯一的问题是如果事务失败,将需要重播的数据量。 TransactionBatch 的概念用于减少 HDFS 中 SteramingAPI 创建的文件数。由于给定批处理中的所有事务都写入同一物理文件(每个存储桶),因此分区只能压缩到包含开放事务的任何批处理中最早事务的级别。因此,不应将 TranactionBatches 设置得过大。包括一个计时器以在一段时间后关闭 TransactionBatch(即使它有未使用的事务)是有意义的。
注意: Hive 1.3.0起,调用 TxnBatch.close()将导致当前 TxnBatch 中所有未使用的事务中止。
有关 HiveConf 对象的 Comments
HiveEndPoint.newConnection()接受一个 HiveConf 参数。可以将其设置为 null,也可以提供预先创建的 HiveConf 对象。如果为 null,则将在内部创建一个 HiveConf 对象并将其用于连接。实例化 HiveConf 对象时,如果包含 hive-site.xml 的目录是 javaClasspath 的一部分,则将使用其中的值初始化 HiveConf 对象。如果未找到 hive-site.xml,则将使用默认值初始化该对象。如果频繁打开连接(例如每秒几次),则预先创建该对象并在多个 Connecting 重用它可能会对性能产生显着影响。安全连接依赖于在 HiveConf 对象中正确设置的“ hive.metastore.kerberos.principal”。
无论在 hive-site.xml 或自定义 HiveConf 中设置了什么值,API 都会在内部覆盖其中的某些设置以确保正确的流传输行为。以下是被覆盖的设置列表:
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.support.concurrency = true
hive.metastore.execute.setugi = true
hive.execution.engine = Mr
I/O –写入数据
这些类和接口提供了在事务内将数据写入 Hive 的支持。
RecordWriter
RecordWriter 是所有 Writer 实现的基本接口。 Writer 负责以 byte []的形式获取记录,其中包含已知格式(例如 CSV)的数据,并以 Hive 流支持的格式将其写出。如果有必要,RecordWriter 可以将传入 Logging 的字段重新排序或删除,以将它们 Map 到 Hive 表中的相应列。流 Client 端将实例化适当的 RecordWriter 类型,并将其传递给 TransactionBatch。流 Client 端不会直接与 RecordWriter 交互。此后,TransactionBatch 将使用和 ManagementRecordWriter 实例来执行 I/O。有关详情,请参见Javadoc。
RecordWriter 的主要功能是:
修改 Importing 记录:如果 Importing 字段没有对应的表列,则可能会删除它们;如果某些列的字段缺失,则添加空值;更改 Importing 字段的 Sequences 以匹配表中字段的 Sequences。此任务需要了解传入的数据格式。并非所有格式(例如 JSON,其数据中都包含字段名称)都需要此步骤。
对修改后的记录进行编码:编码涉及使用适当的Hive SerDe进行序列化。
标识记录所属的存储桶
使用AcidOutputFormat的记录更新程序将编码的记录写入 Hive,以用于适当的存储桶。
DelimitedInputWriter
类 DelimitedInputWriter 实现了 RecordWriter 接口。它接受定界格式(例如 CSV)的 Importing 记录,并将其写入 Hive。如果需要,它将对字段进行重新排序,并使用 LazySimpleSerde 将记录转换为 Object,然后将其传递到相应存储桶的基础 AcidOutputFormat 的记录更新器。参见Javadoc。
StrictJsonWriter
StrictJsonWriter 类实现 RecordWriter 接口。它接受严格的 JSON 格式的 Importing 记录,并将其写入 Hive。它将使用 JsonSerde 将 JSON 记录直接转换为 Object,然后将其传递给相应的存储桶的基础 AcidOutputFormat 的记录更新器。参见Javadoc。
StrictRegexWriter
StrictRegexWriter 类实现 RecordWriter 接口。它接受 Importing 记录,文本格式的正则表达式并将其写入 Hive。它使用适当的正则表达式将文本记录直接转换为使用 RegexSerDe 的对象,然后将其传递给相应存储桶的基础 AcidOutputFormat 的记录更新器。参见Javadoc。在配置单元 1.2.2 和 2.3.0 .中可用
AbstractRecordWriter
这是一个 Base Class,其中包含 RecordWriter 对象所需的一些通用代码,例如架构查找和计算记录应属于的存储桶。
Error Handling
为了使系统正常运行,此 API 的 Client 端必须正确处理错误。一旦获得TransactionBatch
,如果从TransactionBatch
抛出任何异常(SerializationError
除外),则应导致 Client 端调用TransactionBatch.abort()
终止当前事务,然后TransactionBatch.close()
并开始新的批处理以写入更多数据和/或重做上一个事务的工作。在此期间发生故障。在极少数情况下,不遵循此步骤可能会导致文件损坏。此外,理想情况下,StreamingException
应该使 Client 端在开始新批处理之前执行指数回退。这将有助于群集稳定,因为这些失败的最可能原因是 HDFS 过载。
SerializationError
表示无法解析给定的 Tuples。Client 可以选择扔掉这样的 Tuples 或将它们发送到死信队列。看到此异常后,可以将更多数据写入当前事务,并在同一TransactionBatch
中写入更多事务。
示例–非安全模式
///// Stream five records in two transactions /////
// Assumed HIVE table Schema:
create table alerts ( id int , msg string )
partitioned by (continent string, country string)
clustered by (id) into 5 buckets
stored as orc tblproperties("transactional"="true"); // currently ORC is required for streaming
//------- MAIN THREAD ------- //
String dbName = "testing";
String tblName = "alerts";
ArrayList<String> partitionVals = new ArrayList<String>(2);
partitionVals.add("Asia");
partitionVals.add("India");
String serdeClass = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
HiveEndPoint hiveEP = new HiveEndPoint("thrift://x.y.com:9083", dbName, tblName, partitionVals);
.. spin up threads ..
//------- Thread 1 -------//
StreamingConnection connection = hiveEP.newConnection(true);
DelimitedInputWriter writer =
new DelimitedInputWriter(fieldNames,",", hiveEP);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
///// Batch 1 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
if(txnBatch.remainingTransactions() > 0) {
///// Batch 1 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("3,Roshan Naik".getBytes());
txnBatch.write("4,Alan Gates".getBytes());
txnBatch.write("5,Owen O'Malley".getBytes());
txnBatch.commit();
txnBatch.close();
connection.close();
}
txnBatch = connection.fetchTransactionBatch(10, writer);
///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("6,David Schorow".getBytes());
txnBatch.write("7,Sushant Sowmyan".getBytes());
txnBatch.commit();
if(txnBatch.remainingTransactions() > 0) {
///// Batch 2 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("8,Ashutosh Chauhan".getBytes());
txnBatch.write("9,Thejas Nair" getBytes());
txnBatch.commit();
txnBatch.close();
}
connection.close();
//------- Thread 2 -------//
StreamingConnection connection2 = hiveEP.newConnection(true);
DelimitedInputWriter writer2 =
new DelimitedInputWriter(fieldNames,",", hiveEP);
TransactionBatch txnBatch2= connection.fetchTransactionBatch(10, writer2);
///// Batch 1 - First TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("21,Venkat Ranganathan".getBytes());
txnBatch2.write("22,Bowen Zhang".getBytes());
txnBatch2.commit();
///// Batch 1 - Second TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("23,Venkatesh Seetaram".getBytes());
txnBatch2.write("24,Deepesh Khandelwal".getBytes());
txnBatch2.commit();
txnBatch2.close();
connection.close();
txnBatch = connection.fetchTransactionBatch(10, writer);
///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("26,David Schorow".getBytes());
txnBatch.write("27,Sushant Sowmyan".getBytes());
txnBatch.commit();
txnBatch2.close();
connection2.close();
示例–安全流
若要通过 Kerberos 连接到安全的 Hive 元存储,需要一个 UserGroupInformation(UGI)对象。必须从外部获取此 UGI 对象,并将其作为参数传递给 EndPoint.newConnection。使用该连接对象进行的所有后续内部操作(例如,获取事务批处理,写入和提交)将根据需要自动内部包装在 ugi.doAs 块中。
重要提示: 要使用 Kerberos 连接,应使用 EndPoint.newConnection()的'authenticatedUser'参数进行 Kerberos 登录。此外,应在 hive-site.xml 或'conf'参数(如果不为 null)中正确设置'hive.metastore.kerberos.principal'设置。如果使用 hive-site.xml,则其目录应包含在 Classpath 中。
import org.apache.hadoop.security.UserGroupInformation;
HiveEndPoint hiveEP2 = ... ;
UserGroupInformation ugi = .. authenticateWithKerberos(principal,keytab);
StreamingConnection secureConn = hiveEP2.newConnection(true, null, ugi);
DelimitedInputWriter writer3 = new DelimitedInputWriter(fieldNames, ",", hiveEP2);
TransactionBatch txnBatch3= secureConn.fetchTransactionBatch(10, writer3);
///// Batch 1 - First TXN – over secure connection
txnBatch3.beginNextTransaction();
txnBatch3.write("28,Eric Baldeschwieler".getBytes());
txnBatch3.write("29,Ari Zilka".getBytes());
txnBatch3.commit();
txnBatch3.close();
secureConn.close();