Hive Accumulo 集成

Overview

Apache Accumulo是基于 Google BigTable 文件的排序的分布式键值存储。 Accumulo 提供的 API 方法是在键和值方面,它们在读写数据方面具有最高的灵 Active。但是,更高级别的查询抽象通常是留给用户的练习。利用 Apache Hive 作为 Accumulo 的 SQL 接口,可以补充其现有的高吞吐量批处理访问和低延迟随机查找。

Implementation

最初的实现已添加到HIVE-7068的 Hive 0.14 中,旨在与 Accumulo 1.6.x 一起使用。该实现由两个主要组件组成:AccumuloStorageHandler 和 AccumuloPredicateHandler。 AccumuloStorageHandler 是StorageHandler实现。此类的主要作用是 ManagementHive 表到 Accumulo 表的 Map 并配置 Hive 查询。 AccumuloPredicateHandler 用于将过滤器操作下推到 Accumulo,以更有效地减少数据。

Accumulo Configuration

唯一必需的其他 Accumulo 配置是包含作为 Hive 发行版一部分提供的 hiveaccumulo-handler.jar,将其包含在 Accumulo 服务器 Classpath 中。这可以通过多种方式实现:将 jar 复制/符号链接到$ACCUMULO_HOME/lib$ACCUMULO_HOME/lib/ext或在 jar_4._up 中将 jar 的路径包括在general.classpaths中。如果以非动态方式将 jar 添加到 Classpath 中,请确保重新启动 Accumulo tablet 服务器(在 accumulo-site.xml 中使用$ACCUMULO_HOME/libgeneral.classpaths)。

Usage

要使用 Hive 对 Accumulo 发出查询,Hive 配置必须提供四个参数:

Connection Parameters
accumulo.instance.name
accumulo.zookeepers
accumulo.user.name
accumulo.user.pass

对于熟悉 Accumulo 的人来说,这四个配置是连接到 Accumulo 所需的常规配置值:Accumulo 实例名称,ZooKeeper 仲裁(主机的逗号分隔列表)以及 Accumulo 用户名和密码。提供这些值的最简单方法是使用hive命令的-hiveconf选项。期望提供的 Accumulo 用户具有创建新表的能力,或者 Hive 查询将仅访问现有的 Accumulo 表。

hive -hiveconf accumulo.instance.name=accumulo -hiveconf accumulo.zookeepers=localhost -hiveconf accumulo.user.name=hive -hiveconf accumulo.user.pass=hive

若要访问 Accumulo 表,必须使用带有STORED BY子句的CREATE命令来创建 Hive 表。如果CREATE调用中省略了EXTERNAL关键字,则 Accumulo 表的生命周期与 Hive 表的生存期相关:如果删除 Hive 表,则 Accumulo 表也将删除。这是默认情况。提供EXTERNAL关键字将创建一个引用 Accumulo 表的 Hive 表,但是如果该 Hive 表被删除,则不会删除基础的 Accumulo 表。

每个 Hive 行都 Map 到一组具有相同行 ID 的 Accumulo 键。配置单元行中的一列被指定为“特殊”列,用作“累积行” ID。该行中的所有其他 Hive 列都有一些与 Accumulo 列(列族和限定符)的 Map,其中 Hive 列的值位于 Accumulo 值中。

CREATE TABLE accumulo_table(rowid STRING, name STRING, age INT, weight DOUBLE, height INT)
STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
WITH SERDEPROPERTIES('accumulo.columns.mapping' = ':rowid,person:name,person:age,person:weight,person:height');

在上面的语句中,提供了常规的 Hive 列名和类型对,与常规的 create table 语句一样。提供了完整的 AccumuloStorageHandler 类名来通知 Hive Accumulo 将支持此 Hive 表。可通过 SERDEPROPERTIES 或 TBLPROPERTIES 提供许多属性来配置 AccumuloStorageHandler。最重要的属性是“ accumulo.columns.mapping”,它控制 Hive 列如何 Map 到 Accumulo 列。在这种情况下,“行” Hive 列用于填充 Accumulo 键的 Accumulo 行 ID 组件,而其他 Hive 列(名称,年龄,体重和身高)都是 Accumulo 行中的所有列。

对于“ accumulo_table”中的上述架构,我们可以在表中预想一行:

hive> select * from accumulo_table;
row1	Steve	32	200	72

在声明了 accumulo.columns.mapping 的情况下,上述记录将按照以下方式序列化为 Accumulo 键值对:

user@accumulo accumulo_table> scan
row1	person:age []	32
row1	person:height []	72
row1	person:name []	Steve
row1	person:weight []	200

列 Map 的功能在于,具有不同列 Map 的多个 Hive 表可以与同一 Accumulo 表进行交互并产生不同的结果。如果排除列,则可以通过使用 Accumulo 本地性组在服务器端过滤掉不需要的数据来提高 Hive 查询的性能。

Column Mapping

列 Map 字符串是逗号分隔的编码值列表,其偏移量对应于表的 Hive 模式。只要列 Map 中的元素与预期的 Hive 列对齐,Hive 模式中列的 Sequences 就可以是任意的。对于熟悉 Accumulo 的人来说,列 Map 字符串中的每个元素都类似于 column_family:column_qualifier;但是,有几种不同的变体可以实现不同的控制。

  • 单列

  • 这会将 Hive 列的值放入具有给定列族和列限定符的 Accumulo 值中。

  • 列限定符 Map

  • 提供了一个列族,并且允许任何长度的列限定符前缀,后跟一个星号。

    • Hive 列类型应为 Map,HiveMap 的键将附加到列限定符前缀

    • Hive 贴图的值放置在 Accumulo 值中。

  • The rowid

  • 控制将哪个 Hive 列用作 Accumulo rowid。

    • 每个列 Map 中必须存在一个“:rowid”元素

    • “:rowid”不区分大小写(:rowID 等效于:rowId)

此外,可以为列 Map 中的每个元素提供序列化选项,该选项将控制如何序列化值。当前,这些选项是:

  • 'binary'或'b'

  • '字符串'或's'

通过在带有长或短序列化值的列 Map 元素之后添加井号('#')来设置这些值。默认序列化为“字符串”。例如,对于值 10,“ person:age#s”与“ person:age”同义,并且会将值序列化为 Literals 字符串“ 10”。如果改用“ person:age#b”,则该值将序列化为四个字节:\ x00\x00\x00\xA0.

Indexing

从 Hive 3.0.0(带有HIVE-15795)开始,索引支持已添加到 Accumulo 支持的 Hive 表中。通过使用另一个 Accumulo 表将字段值 Map 存储到数据表的 rowId 来进行索引。通过 Hive 插入记录时会自动填充索引表。

使用索引表可以消除全表扫描,从而大大提高了非 rowId 谓词查询的性能。使用 Tez 或 Map Reduce 查询引擎,索引可用于内部和外部 Management 的表。以下选项控制索引编制行为。

Option NameDescription
accumulo.indextable.name(必填)Accumulo 中索引表的名称.
accumulo.indexed.columns(可选)以逗号分隔的要索引的配置单元列列表,或用于索引所有列的索引(默认值:)
accumulo.index.rows.max(可选)每个搜索谓词从索引中扫描的谓词值的最大数目(默认值:20000)

有关此值,请参阅此 Comments
accumulo.index.scanner(可选)索引扫描器实现。 (默认值:org.apache.hadoop.hive.accumulo.AccumuloDefaultIndexScanner)

索引使用以下格式存储在索引表中:

rowId = [field value in data table]

column_family = [field column family in data table] + '_' + [field column quantifier in data table]

column_quantifier = [field rowId in data table]

visibility = [field visibility in data table]

value = [empty byte array]

当使用字符串编码表时,使用 Accumulo Lexicoder 方法对数字类型的索引字段值进行编码。否则,将使用本地二进制编码对值进行编码。此信息将允许应用程序在 Hive 之外将数据和索引值插入 Accumulo,但仍需要 Hive 内部的高性能查询。

重要的是要注意在 Hive 外部插入数据和索引时,更新同一工作单元中的两个表很重要。如果 Hive 查询未找到与任何查询谓词匹配的索引,则该查询将短路并返回空结果,而不搜索数据表。

如果搜索谓词匹配的条目多于选项accumulo.index.rows.max定义的条目(默认值为 20000),则将放弃索引搜索结果,并且查询将返回到使用谓词过滤的数据表完整扫描。请记住,为此选项使用较大的值或具有非常大的数据表的 rowId 值可能需要增加配置单元内存以防止内存错误。

Other options

以下选项也可以与 SERDEPROPERTIES 或 TABLEPROPERTIES 一起使用,以进一步控制 AccumuloStorageHandler 的操作:

Option NameDescription
accumulo.iterator.pushdown使用迭代器在 Accumulo 中是否应满足过滤谓词(默认值:true)
accumulo.default.storage值的默认存储序列化方法(默认:字符串)
accumulo.visibility.label将任何记录写入 Accumulo 时使用的静态 ColumnVisibility 字符串(默认值:空字符串)
accumulo.authorizations以逗号分隔的授权列表,用于扫描 Accumulo(默认值:无授权)。


请注意,提供给 Accumulo 的用于连接 Accumulo 的用户必须具有所有授权。
| accumulo.composite.rowid.factory |扩展点,该扩展点允许从 rowid 构造 LazyObject 时提供自定义类,而无需进行更改
rowid 列的 ObjectInspector。
|| accumulo.composite.rowid |扩展点,该扩展点允许将 rowid 列自定义解析为 LazyObject。
|| accumulo.table.name |控制使用什么 Accumulo 表名(默认值:Hive 表名)|
| accumulo.mock.instance |使用 MockAccumulo 实例代替连接到真实实例(默认值:false)。对测试有用。

Examples

覆盖 Accumulo 表名称

创建一个用户表,该表由用户的一些唯一键,用户 ID 和用户名组成。 Accumulo 行 ID 来自 Hive 列,用户 ID 列写入“ f”列族和“ userid”列限定符,用户名列写入“ f”列族和“昵称”列限定符。代替使用“用户” Accumulo 表,它在 TBLPROPERTIES 中被覆盖,改为使用 Accumulo 表“ hive_users”。

CREATE TABLE users(key int, userid int, username string) 
STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
WITH SERDEPROPERTIES ("accumulo.columns.mapping" = ":rowID,f:userid,f:nickname")
WITH TBLPROPERTIES ("accumulo.table.name" = "hive_users");

使用二进制序列化存储 HiveMap

在列 Map 字符串中使用星号,即可将 HiveMap 从单个 Accumulo 键值对扩展为多个键值对。 Hive Map 是参数化类型:在以下情况下,键是字符串,值是整数。默认序列化从'string'覆盖为'binary',这意味着 HiveMap 值中的整数将被存储为一系列字节,而不是 UTF-8 字符串表示形式。

CREATE TABLE hive_map(key int, value map<string,int>) 
STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
WITH SERDEPROPERTIES (
  "accumulo.columns.mapping" = ":rowID,cf:*",
  "accumulo.default.storage" = "binary"
);

注册外部表

使用 external 关键字创建 Hive 表可以使 Accumulo 表的生命周期与 Hive 表的生命周期脱钩。创建该表的前提是 Accumulo 表“ countries”已经存在。这是使用 Hive 来 Management 由某些外部工具(例如 MapReduce 作业)创建和填充的表的一种非常有用的方法。删除 Hive 表国家/地区后,将不会删除 Accumulo 表。此外,在创建具有不同选项的多个 Hive 表时,在同一基础 Accumulo 表上运行时,external 关键字也很有用。

CREATE EXTERNAL TABLE countries(key string, name string, country string, country_id int)
STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
WITH SERDEPROPERTIES ("accumulo.columns.mapping" = ":rowID,info:name,info:country,info:country_id");

创建索引表

为了利用索引的优势,Hive 使用另一个 Accumulo 表来为每个字段创建按字典 Sequences 分类的搜索词索引,从而实现非常有效的精确匹配和边界范围搜索。

CREATE TABLE company_stats (
   rowid string,
   active_entry boolean,
   num_offices tinyint,
   num_personel smallint,
   total_manhours int,
   num_shareholders bigint,
   eff_rating float,
   err_rating double,
   yearly_production decimal,
   start_date date,
   address varchar(100),
   phone char(13),
   last_update timestamp )
ROW FORMAT SERDE 'org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe'
STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
WITH SERDEPROPERTIES (
   "accumulo.columns.mapping" = ":rowID,a:act,a:off,a:per,a:mhs,a:shs,a:eff,a:err,a:yp,a:sd,a:addr,a:ph,a:lu",
   "accumulo.table.name"="company_stats",
   "accumulo.indextable.name"="company_stats_idx"
 );

Acknowledgements

我不会忘记不说 Brian Femiano 所做的努力是该存储处理程序的基础。他最初的 Accumulo-Hive 集成原型是这项工作的基础。