On this page
Hive HBase 批量加载
本页说明如何使用 Hive 按HIVE-1295将数据批量加载到新的(空)HBase 表中。 (如果您尚未使用包含此功能的内部版本,则需要从源进行构建,并确保同时应用了此补丁和 HIVE-1321.)
Overview
理想情况下,从 Hive 到 HBase 的批量加载将成为HBaseIntegration的一部分,使其变得如此简单:
CREATE TABLE new_hbase_table(rowkey string, x int, y int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:x,cf:y");
SET hive.hbase.bulk=true;
INSERT OVERWRITE TABLE new_hbase_table
SELECT rowkey_expression, x, y FROM ...any_hive_query...;
但是,事情还没有那么简单。而是需要一个涉及一系列 SQL 命令的过程。与编写您自己的 Map/缩 Servlets 相比,它应该仍然更容易,更灵活,并且随着时间的流逝,我们希望增强 Hive 使其更接近理想。
该过程基于基本的 HBase 建议,包括以下步骤:
确定数据加载到 HBase 后的外观。
确定计划用于并行化排序和 HFile 创建的化简器的数量。这取决于数据的大小以及可用的群集资源。
运行 Hive 采样命令,这将创建一个包含“ splitter”键的文件,该文件将用于在排序过程中对数据进行范围分区。
在 HDFS 中准备将要生成 HFile 的暂存位置。
运行 Hive 命令,将执行排序并生成 HFile。
(可选:如果 HBase 和 Hive 在不同的群集中运行,请将生成的文件从 Hive 群集分配到 HBase 群集.)
运行 HBase 脚本
loadtable.rb
,将文件移动到新的 HBase 表中。(可选:将 HBase 表注册为 Hive 中的外部表,以便您可以从那里访问它.)
该页面的其余部分将更详细地说明每个步骤。
确定目标 HBase 架构
当前,这里有很多限制:
目标表必须是新表(您不能批量加载到现有表中)
目标表只能有一个列族(HBASE-1861)
目标表不能是稀疏的(每行都具有相同的列集);通过允许从 Hive 读取 MAP 值,和/或允许以透视形式从 Hive 读取行(每个 HBase 单元一行),应该很容易解决此问题。
除了处理这些约束之外,这里最重要的工作可能是确定如何为来自 Hive 的每一行分配 HBase 行键。为了避免词法比较器和二进制比较器之间的不一致,最简单的方法是设计一个字符串行键,并始终使用它。如果要将多个列合并到键中,请为此目的使用 Hive 的字符串 concat 表达式。您可以使用 CREATE VIEW 在逻辑上添加行键,而不必更新 Hive 中的任何现有数据。
估计所需资源
TBD:根据 Facebook 实验提供一些示例数字;另请参考Hadoop Terasort
添加必要的 JAR
您将需要在路径中添加几个 jar 文件。首先,将它们放在 DFS 中:
hadoop dfs -put /usr/lib/hive/lib/hbase-VERSION.jar /user/hive/hbase-VERSION.jar
hadoop dfs -put /usr/lib/hive/lib/hive-hbase-handler-VERSION.jar /user/hive/hive-hbase-handler-VERSION.jar
然后将它们添加到您的 hive-site.xml 中:
<property>
<name>hive.aux.jars.path</name>
<value>/user/hive/hbase-VERSION.jar,/user/hive/hive-hbase-handler-VERSION.jar</value>
</property>
准备范围分区
为了对数据执行并行排序,我们需要对其进行范围划分。想法是将行键的空间划分为几乎相等的范围,每个缩减器一个,将在并行排序中使用。详细信息将根据您的源数据而有所不同,并且您可能需要运行一些探索性的 Hive 查询,以提供足够好的一组范围。这是一个例子:
add jar lib/hive-contrib-0.7.0.jar;
set mapred.reduce.tasks=1;
create temporary function row_sequence as
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';
select transaction_id from
(select transaction_id
from transactions
tablesample(bucket 1 out of 10000 on transaction_id) s
order by transaction_id
limit 10000000) x
where (row_sequence() % 910000)=0
order by transaction_id
limit 11;
通过对表的.01%samples 中的所有行进行排序(使用单个化简器),然后选择每第 n 行(此处 n = 910000),可以进行此操作。通过将 samples 中的行总数除以所需的范围数(例如,n)来选择 n 的值。在这种情况下为 12(比 LIMIT 子句产生的分区键数多一)。这里的假设是 samples 中的分布与表中的整体分布相匹配;如果不是这种情况,则生成的分区键将导致并行排序中的偏斜。
定义了采样查询后,下一步就是将其结果保存到格式正确的文件中,该文件将在后续步骤中使用。为此,请运行以下命令:
create external table hb_range_keys(transaction_id_range_start string)
row format serde
'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
stored as
inputformat
'org.apache.hadoop.mapred.TextInputFormat'
outputformat
'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
location '/tmp/hb_range_keys';
insert overwrite table hb_range_keys
select transaction_id from
(select transaction_id
from transactions
tablesample(bucket 1 out of 10000 on transaction_id) s
order by transaction_id
limit 10000000) x
where (row_sequence() % 910000)=0
order by transaction_id
limit 11;
第一条命令创建一个外部表,该表定义了要创建的文件的格式;确保完全按照指定设置 serde 和 inputformat/outputformat。
第二个命令填充它(使用先前定义的采样查询)。使用 ORDER BY 可以确保在目录/tmp/hb_range_keys
中生成单个文件。文件名是未知的,但是稍后需要按名称引用该文件,因此请运行以下命令将其复制为特定名称:
dfs -cp /tmp/hb_range_keys/* /tmp/hb_range_key_list;
准备登台位置
这种排序将产生大量数据,因此请确保您的 HDFS 群集中有足够的空间,然后选择文件暂存的位置。在此示例中,我们将使用/tmp/hbsort
。
该目录实际上不需要存在(它将在下一步中自动创建),但是如果存在,则应为空。
dfs -rmr /tmp/hbsort;
dfs -mkdir /tmp/hbsort;
Sort Data
现在迈出了一大步:对要批量加载的所有数据进行排序。确保您的 Hive 实例在其 auxpath 上具有可用的 HBase jar。
set hive.execution.engine=mr;
set mapred.reduce.tasks=12;
set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
set total.order.partitioner.path=/tmp/hb_range_key_list;
set hfile.compression=gz;
create table hbsort(transaction_id string, user_name string, amount double, ...)
stored as
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf');
insert overwrite table hbsort
select transaction_id, user_name, amount, ...
from transactions
cluster by transaction_id;
CREATE TABLE 创建一个虚拟表,该表控制如何写入排序输出。请注意,它使用HiveHFileOutputFormat
来执行此操作,而表属性hfile.family.path
用于控制输出的目标目录。同样,请确保完全按照指定设置 Importing 格式/输出格式。在上面的示例中,我们为结果文件选择 gzip(gz)压缩;如果未设置hfile.compression
参数,则不会执行压缩。 (另一种可用的方法是 lzo,它压缩程度较小,但不需要太多的 CPU 能力.)
请注意,reduce 任务的数量比分区的数量多一个-这 必须 为真,否则您将收到“键集中的分区数量错误”错误。
有一个参数hbase.hregion.max.filesize
(默认 256MB)会影响 HFiles 的生成方式。如果减速器产生的数据量(预压缩)超过此限制,则将为该减速器生成多个 HFile。这将导致区域文件不平衡。这不会引起任何正确性问题,但是,如果要获取平衡的区域文件,请使用更多的 reducer 或将此参数设置为更大的值。请注意,启用压缩后,您可能会看到生成了多个文件,这些文件的大小都远远低于限制。这是因为溢出检查是在压缩前完成的。
路径中的cf
指定将在 HBase 中创建的列族的名称,因此您在此处选择的目录名称很重要。 (请注意,这里我们实际上并没有使用 HBase 表; HiveHFileOutputFormat
直接写入文件.)
CLUSTER BY 子句提供分区程序要使用的键;确保它与您在上一步中提出的范围分区相匹配。
SELECT 列表中的第一列被解释为行键;随后的列将成为单元格值(都在单个列族中,因此它们的列名很重要)。
运行 HBase 脚本
排序作业成功完成后,需要最后一步将结果文件导入 HBase。同样,我们不知道文件名,因此将其复制到:
dfs -copyToLocal /tmp/hbsort/cf/* /tmp/hbout
如果 Hive 和 HBase 在不同的群集中运行,请使用distcp将文件从一个复制到另一个。
如果您使用的是 HBase 0.90.2 或更高版本,则可以使用completebulkloadUtil 将数据加载到 HBase 中
hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /tmp/hbout transactions
在旧版本的 HBase 中,使用bin/loadtable.rb
脚本导入它们:
hbase org.jruby.Main loadtable.rb transactions /tmp/hbout
第一个参数(transactions
)指定新的 HBase 表的名称。对于第二个参数,请传递登台目录名称,而不是列族子目录。
该脚本完成后,您可能需要 await 一两分钟,以便 HBase 元扫描程序提取新表。使用 hbase shell 来验证是否正确创建了新表,并进行了一些合理性查询来查找单个单元格并确保可以找到它们。
将新表 Map 回 Hive
最后,如果您想访问通过 Hive 创建的 HBase 表:
CREATE EXTERNAL TABLE hbase_transactions(transaction_id string, user_name string, amount double, ...)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:user_name,cf:amount,...")
TBLPROPERTIES("hbase.table.name" = "transactions");
Followups Needed
支持稀疏表
修复 HIVE-1245 后,支持加载二进制数据表示
支持时间戳分配
提供对文件参数(例如压缩)的控制
一旦实施 HBASE-1861,便支持多个列系列
实施 HBASE-1923 后,支持将其加载到现有表中
将所有内容包装到理想的具有自动采样功能的单插入中...