Hive HBase 批量加载

本页说明如何使用 Hive 按HIVE-1295将数据批量加载到新的(空)HBase 表中。 (如果您尚未使用包含此功能的内部版本,则需要从源进行构建,并确保同时应用了此补丁和 HIVE-1321.)

Overview

理想情况下,从 Hive 到 HBase 的批量加载将成为HBaseIntegration的一部分,使其变得如此简单:

CREATE TABLE new_hbase_table(rowkey string, x int, y int) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:x,cf:y");

SET hive.hbase.bulk=true;

INSERT OVERWRITE TABLE new_hbase_table
SELECT rowkey_expression, x, y FROM ...any_hive_query...;

但是,事情还没有那么简单。而是需要一个涉及一系列 SQL 命令的过程。与编写您自己的 Map/缩 Servlets 相比,它应该仍然更容易,更灵活,并且随着时间的流逝,我们希望增强 Hive 使其更接近理想。

该过程基于基本的 HBase 建议,包括以下步骤:

  • 确定数据加载到 HBase 后的外观。

  • 确定计划用于并行化排序和 HFile 创建的化简器的数量。这取决于数据的大小以及可用的群集资源。

  • 运行 Hive 采样命令,这将创建一个包含“ splitter”键的文件,该文件将用于在排序过程中对数据进行范围分区。

  • 在 HDFS 中准备将要生成 HFile 的暂存位置。

  • 运行 Hive 命令,将执行排序并生成 HFile。

  • (可选:如果 HBase 和 Hive 在不同的群集中运行,请将生成的文件从 Hive 群集分配到 HBase 群集.)

  • 运行 HBase 脚本loadtable.rb,将文件移动到新的 HBase 表中。

  • (可选:将 HBase 表注册为 Hive 中的外部表,以便您可以从那里访问它.)

该页面的其余部分将更详细地说明每个步骤。

确定目标 HBase 架构

当前,这里有很多限制:

  • 目标表必须是新表(您不能批量加载到现有表中)

  • 目标表只能有一个列族(HBASE-1861)

  • 目标表不能是稀疏的(每行都具有相同的列集);通过允许从 Hive 读取 MAP 值,和/或允许以透视形式从 Hive 读取行(每个 HBase 单元一行),应该很容易解决此问题。

除了处理这些约束之外,这里最重要的工作可能是确定如何为来自 Hive 的每一行分配 HBase 行键。为了避免词法比较器和二进制比较器之间的不一致,最简单的方法是设计一个字符串行键,并始终使用它。如果要将多个列合并到键中,请为此目的使用 Hive 的字符串 concat 表达式。您可以使用 CREATE VIEW 在逻辑上添加行键,而不必更新 Hive 中的任何现有数据。

估计所需资源

TBD:根据 Facebook 实验提供一些示例数字;另请参考Hadoop Terasort

添加必要的 JAR

您将需要在路径中添加几个 jar 文件。首先,将它们放在 DFS 中:

hadoop dfs -put /usr/lib/hive/lib/hbase-VERSION.jar /user/hive/hbase-VERSION.jar
hadoop dfs -put /usr/lib/hive/lib/hive-hbase-handler-VERSION.jar /user/hive/hive-hbase-handler-VERSION.jar

然后将它们添加到您的 hive-site.xml 中:

<property>
  <name>hive.aux.jars.path</name>
  <value>/user/hive/hbase-VERSION.jar,/user/hive/hive-hbase-handler-VERSION.jar</value>
</property>

准备范围分区

为了对数据执行并行排序,我们需要对其进行范围划分。想法是将行键的空间划分为几乎相等的范围,每个缩减器一个,将在并行排序中使用。详细信息将根据您的源数据而有所不同,并且您可能需要运行一些探索性的 Hive 查询,以提供足够好的一组范围。这是一个例子:

add jar lib/hive-contrib-0.7.0.jar;
set mapred.reduce.tasks=1;
create temporary function row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';
select transaction_id from
(select transaction_id
from transactions
tablesample(bucket 1 out of 10000 on transaction_id) s 
order by transaction_id 
limit 10000000) x
where (row_sequence() % 910000)=0
order by transaction_id
limit 11;

通过对表的.01%samples 中的所有行进行排序(使用单个化简器),然后选择每第 n 行(此处 n = 910000),可以进行此操作。通过将 samples 中的行总数除以所需的范围数(例如,n)来选择 n 的值。在这种情况下为 12(比 LIMIT 子句产生的分区键数多一)。这里的假设是 samples 中的分布与表中的整体分布相匹配;如果不是这种情况,则生成的分区键将导致并行排序中的偏斜。

定义了采样查询后,下一步就是将其结果保存到格式正确的文件中,该文件将在后续步骤中使用。为此,请运行以下命令:

create external table hb_range_keys(transaction_id_range_start string)
row format serde 
'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
stored as 
inputformat 
'org.apache.hadoop.mapred.TextInputFormat'
outputformat 
'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
location '/tmp/hb_range_keys';

insert overwrite table hb_range_keys
select transaction_id from
(select transaction_id
from transactions
tablesample(bucket 1 out of 10000 on transaction_id) s 
order by transaction_id 
limit 10000000) x
where (row_sequence() % 910000)=0
order by transaction_id
limit 11;

第一条命令创建一个外部表,该表定义了要创建的文件的格式;确保完全按照指定设置 serde 和 inputformat/outputformat。

第二个命令填充它(使用先前定义的采样查询)。使用 ORDER BY 可以确保在目录/tmp/hb_range_keys中生成单个文件。文件名是未知的,但是稍后需要按名称引用该文件,因此请运行以下命令将其复制为特定名称:

dfs -cp /tmp/hb_range_keys/* /tmp/hb_range_key_list;

准备登台位置

这种排序将产生大量数据,因此请确保您的 HDFS 群集中有足够的空间,然后选择文件暂存的位置。在此示例中,我们将使用/tmp/hbsort

该目录实际上不需要存在(它将在下一步中自动创建),但是如果存在,则应为空。

dfs -rmr /tmp/hbsort;
dfs -mkdir /tmp/hbsort;

Sort Data

现在迈出了一大步:对要批量加载的所有数据进行排序。确保您的 Hive 实例在其 auxpath 上具有可用的 HBase jar。

set hive.execution.engine=mr;
set mapred.reduce.tasks=12;
set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
set total.order.partitioner.path=/tmp/hb_range_key_list;
set hfile.compression=gz;

create table hbsort(transaction_id string, user_name string, amount double, ...)
stored as
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf');

insert overwrite table hbsort
select transaction_id, user_name, amount, ...
from transactions
cluster by transaction_id;

CREATE TABLE 创建一个虚拟表,该表控制如何写入排序输出。请注意,它使用HiveHFileOutputFormat来执行此操作,而表属性hfile.family.path用于控制输出的目标目录。同样,请确保完全按照指定设置 Importing 格式/输出格式。在上面的示例中,我们为结果文件选择 gzip(gz)压缩;如果未设置hfile.compression参数,则不会执行压缩。 (另一种可用的方法是 lzo,它压缩程度较小,但不需要太多的 CPU 能力.)

请注意,reduce 任务的数量比分区的数量多一个-这 必须 为真,否则您将收到“键集中的分区数量错误”错误。

有一个参数hbase.hregion.max.filesize(默认 256MB)会影响 HFiles 的生成方式。如果减速器产生的数据量(预压缩)超过此限制,则将为该减速器生成多个 HFile。这将导致区域文件不平衡。这不会引起任何正确性问题,但是,如果要获取平衡的区域文件,请使用更多的 reducer 或将此参数设置为更大的值。请注意,启用压缩后,您可能会看到生成了多个文件,这些文件的大小都远远低于限制。这是因为溢出检查是在压缩前完成的。

路径中的cf指定将在 HBase 中创建的列族的名称,因此您在此处选择的目录名称很重要。 (请注意,这里我们实际上并没有使用 HBase 表; HiveHFileOutputFormat直接写入文件.)

CLUSTER BY 子句提供分区程序要使用的键;确保它与您在上一步中提出的范围分区相匹配。

SELECT 列表中的第一列被解释为行键;随后的列将成为单元格值(都在单个列族中,因此它们的列名很重要)。

运行 HBase 脚本

排序作业成功完成后,需要最后一步将结果文件导入 HBase。同样,我们不知道文件名,因此将其复制到:

dfs -copyToLocal /tmp/hbsort/cf/* /tmp/hbout

如果 Hive 和 HBase 在不同的群集中运行,请使用distcp将文件从一个复制到另一个。

如果您使用的是 HBase 0.90.2 或更高版本,则可以使用completebulkloadUtil 将数据加载到 HBase 中

hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /tmp/hbout transactions

在旧版本的 HBase 中,使用bin/loadtable.rb脚本导入它们:

hbase org.jruby.Main loadtable.rb transactions /tmp/hbout

第一个参数(transactions)指定新的 HBase 表的名称。对于第二个参数,请传递登台目录名称,而不是列族子目录。

该脚本完成后,您可能需要 await 一两分钟,以便 HBase 元扫描程序提取新表。使用 hbase shell 来验证是否正确创建了新表,并进行了一些合理性查询来查找单个单元格并确保可以找到它们。

将新表 Map 回 Hive

最后,如果您想访问通过 Hive 创建的 HBase 表:

CREATE EXTERNAL TABLE hbase_transactions(transaction_id string, user_name string, amount double, ...) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:user_name,cf:amount,...")
TBLPROPERTIES("hbase.table.name" = "transactions");

Followups Needed

  • 支持稀疏表

  • 修复 HIVE-1245 后,支持加载二进制数据表示

  • 支持时间戳分配

  • 提供对文件参数(例如压缩)的控制

  • 一旦实施 HBASE-1861,便支持多个列系列

  • 实施 HBASE-1923 后,支持将其加载到现有表中

  • 将所有内容包装到理想的具有自动采样功能的单插入中...