Introduction

Version information

Hive 2.2.0(HIVE-14217)中引入了 Druid 集成。最初它与 Druid 0.9.1.1 兼容,Druid 到那时是最新的稳定版本。

本页记录了在HIVE-14217中开始的 Druid 和 Hive 集成的工作。

Objectives

我们的主要目标是能够将 Hive 中的数据索引到 Druid 中,并能够查询 Hive 中的 Druid 数据源。完成这项工作将为 Druid 和 Hive 系统带来好处:

  • 在 Hive 中高效执行 OLAP 查询. Druid 是专门针对事件数据执行 OLAP 查询而专门设计的系统。 Hive 将能够利用其效率来执行此类查询。

  • 在 Druid 之上引入 SQL 接口. Druid 查询以 JSON 表示,并且 Druid 通过 HTTP 上的 REST API 查询。用户声明存储在 Druid 中的 Hive 表后,我们将能够从 Importing 的 Hive SQL 查询中透明地生成 Druid JSON 查询。

  • 能够对 Druid 数据执行复杂的操作. Druid 本身还不支持多种操作,例如加入。将 Hive 放在 Druid 之上将使您能够在 Druid 数据源上执行更复杂的查询。

  • *使用 Hive 在 Druid 中对复杂查询进行索引.*当前,在 Druid 中的索引通常是通过 MapReduce 作业完成的。我们将使 Hive 能够将给定查询的结果直接索引到 Druid 中,例如作为新表或实例化视图(HIVE-10459),并立即开始查询和使用该数据集。

初始实现始于HIVE-14217,着重于 1)使得能够从 Hive 中发现已经存储在 Druid 中的数据,以及 2)能够查询该数据,从而尝试使用 Druid 的高级查询功能。例如,我们特别强调将尽可能多的计算推给德鲁伊,并能够识别德鲁伊特别有效的查询类型,例如* timeseries groupBy *查询。

第一步完成后的将来工作会在HIVE-14473中列出。如果您想在此方面进行合作,可以在本文档的末尾找到其余问题的列表。

Preliminaries

在进一步详细介绍之前,我们介绍一些 Reader 需要了解的背景知识才能理解本文档。

Druid

Druid 是一个开放源代码分析数据存储,旨在用于事件数据的商业智能(OLAP)查询。 Druid 提供了低延迟(实时)数据摄取,灵活的数据浏览和快速的数据聚合。现有的 Druid 部署已扩展到数万亿个事件和 PB 级数据。 Druid 最常用于驱动面向用户的分析应用程序。您可以找到有关 Druid here的更多信息。

Storage Handlers

您可以找到 Hive 存储处理程序here的概述; Druid 与 Hive 的集成取决于该框架。

Usage

对于正在运行的示例,我们使用 Druid 快速入门教程中包含的* wikiticker *数据集。

从 Hive 发现和 ManagementDruid 数据源

首先,我们专注于 Hive 的 Druid 数据源的发现和 Management。

创建链接到现有 Druid 数据源的表

假设我们已经将前面提到的* wikiticker 数据集存储在 Druid 中,并且 Druid 代理的地址为 10.5.0.10:8082 *。

首先,您需要在配置中设置 Hive 属性hive.druid.broker.address.default以指向代理地址:

SET hive.druid.broker.address.default=10.5.0.10:8082;

然后,要创建一个可以从 Hive 查询的表,我们在 Hive 中执行以下语句:

CREATE EXTERNAL TABLE druid_table_1
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "wikiticker");

请注意,您需要使用druid.datasource属性将* datasource 指定为 TBLPROPERTIES。此外,注意将表创建为 EXTERNAL ,因为数据存储在 Druid 中。该表只是一个逻辑实体,我们将使用它来表达查询,但是创建表时没有数据移动。实际上,执行该语句时,实际情况是 Hive 向 Druid 发送 segment metadata *查询以发现数据源的架构(列及其类型)。检索可能有用的其他信息,例如统计信息行数已包含在我们的 Route 图中,但尚不支持。最后,请注意,如果我们更改默认代理地址的 Hive 属性值,则该表上的查询将针对新代理地址自动运行,因为该地址未与表存储在一起。

如果执行* DESCRIBE *语句,我们实际上可以看到有关表的信息:

hive> DESCRIBE FORMATTED druid_table_1;
OK
# col_name            	data_type           	comment
__time              	timestamp           	from deserializer
added               	bigint              	from deserializer
channel             	string              	from deserializer
cityname            	string              	from deserializer
comment             	string              	from deserializer
count               	bigint              	from deserializer
countryisocode      	string              	from deserializer
countryname         	string              	from deserializer
deleted             	bigint              	from deserializer
delta               	bigint              	from deserializer
isanonymous         	string              	from deserializer
isminor             	string              	from deserializer
isnew               	string              	from deserializer
isrobot             	string              	from deserializer
isunpatrolled       	string              	from deserializer
metrocode           	string              	from deserializer
namespace           	string              	from deserializer
page                	string              	from deserializer
regionisocode       	string              	from deserializer
regionname          	string              	from deserializer
user                	string              	from deserializer
user_unique         	string              	from deserializer
# Detailed Table Information
Database:           	druid
Owner:              	user1
CreateTime:         	Thu Aug 18 19:09:10 BST 2016
LastAccessTime:     	UNKNOWN
Retention:          	0
Location:           	hdfs:/tmp/user1/hive/warehouse/druid.db/druid_table_1
Table Type:         	EXTERNAL_TABLE
Table Parameters:
	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\"}
	EXTERNAL            	TRUE
	druid.datasource    	wikiticker
	numFiles            	0
	numRows             	0
	rawDataSize         	0
	storage_handler     	org.apache.hadoop.hive.druid.DruidStorageHandler
	totalSize           	0
	transient_lastDdlTime	1471543750
# Storage Information
SerDe Library:      	org.apache.hadoop.hive.druid.serde.DruidSerDe
InputFormat:        	null
OutputFormat:       	null
Compressed:         	No
Num Buckets:        	-1
Bucket Columns:     	[]
Sort Columns:       	[]
Storage Desc Params:
	serialization.format	1
Time taken: 0.111 seconds, Fetched: 55 row(s)

我们可以看到与 Druid 类别相对应的三组不同的列:Druid 中必需的 timestamp 列(__time), dimension 列(其类型为 STRING)和** metrics * *列(其余所有列)。

从 Hive 创建 Druid 数据源

如果我们要 ManagementHive 的 Druid 数据源中的数据,则有多种可能的情况。

例如,我们可能想使用* CREATE TABLE 语句创建一个由 Druid 支持的空表,然后分别使用 INSERT INSERT OVERWRITE * Hive 语句添加和覆盖数据。

CREATE EXTERNAL TABLE druid_table_1
(`__time` TIMESTAMP, `dimension1` STRING, `dimension2` STRING, `metric1` INT, `metric2` FLOAT)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler';

另一个可能的情况是我们的数据存储在 Hive 表中,我们希望对其进行预处理并从 Hive 创建 Druid 数据源,以加快 SQL 查询工作量。我们可以通过执行* Create Table As Select *(CTAS)语句来做到这一点。例如:

CREATE EXTERNAL TABLE druid_table_1
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
AS
<select `timecolumn` as `__time`, `dimension1`, `dimension2`, `metric1`, `metric2`....>;

观察到我们仍然创建了与 Druid 类别相对应的三组不同的列组:Druid 中必需的 timestamp 列(__time), dimension 列(其类型为 STRING)和** metrics * *列(其余所有列)。

在这两个语句中,列类型(为* CREATE TABLE 语句静态指定或从 CTAS *语句的查询结果中推断)都用于推断相应的 Druid 列类别。

此外,请注意,如果我们没有为druid.datasource属性指定值,则 Hive 会自动使用表的标准名称来创建具有相同名称的相应数据源。

Version Info

版本 2.2.0:通过配置单元 Management 数据时创建表语法.

CREATE TABLE druid_table_1
(`__time` TIMESTAMP, `dimension1` STRING, `dimension2` STRING, `metric1` INT, `metric2` FLOAT)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler';

注意-在 Hive 3.0.0 之前,我们不使用* EXTERNAL *表,也未指定druid.datasource属性的值。

对于 3.0.0 版本,所有 Druid 表均为 EXTERNAL(HIVE-20085).

从 Hive 摄取 Druid kafka

Version Info

Hive 3.0.0(HIVE-18976)中引入了与 Druid Kafka 索引服务的集成。

德鲁伊·Kafka 索引服务通过 ManagementKafka 索引编制任务的创建和生存期,支持从 Kafka 主题一次提取数据。我们可以使用 Hive * CREATE TABLE *语句来 ManagementDruid Kafka 摄入,如下所示。

德鲁伊·Kafka 摄入

CREATE EXTERNAL TABLE druid_kafka_table_1(`__time` timestamp,`dimension1` string, `dimension1` string, `metric1` int, `metric2 double ....)
        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
        TBLPROPERTIES (
        "kafka.bootstrap.servers" = "localhost:9092",
        "kafka.topic" = "topic1",
        "druid.kafka.ingestion.useEarliestOffset" = "true",
        "druid.kafka.ingestion.maxRowsInMemory" = "5",
        "druid.kafka.ingestion.startDelay" = "PT1S",
        "druid.kafka.ingestion.period" = "PT1S",
        "druid.kafka.ingestion.consumer.retries" = "2"
        );

观察到我们在表属性中指定了 kafka 主题名称和 kafka 引导服务器。 德鲁伊·Kafka 索引服务的其他调音也可以通过在其前面加上*'druid.kafka.ingestion'来指定。例如,配置 druid 摄取任务的持续时间,我们可以添加* *“ druid.kafka.ingestion.taskDuration” =“ PT60S” *作为表格属性。

开始/停止/重置 Druid Kafka 摄入

我们可以使用如下所示的 sql 语句开始/停止/重置 druid kafka 摄取。

ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'STOP');
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET');

注意:重置摄取会将德鲁伊维护的 kafkaConsumer 偏移量重置为下一个偏移量。根据* druid.kafka.ingestion.useEarliestOffset *,由德鲁伊维护的使用者偏移量将重置为最早或最新的偏移量。

表属性。这可能导致重复/丢失事件。通常,仅当 Kafka 中的当前使用方偏移量的消息不再可用于消耗并且因此不会被吸入德鲁伊时,才需要重置 Kafka 使用。

INSERT,INSERT OVERWRITE 和 DROP 语句

Version Info

版本 2.2.0:这些语句受 Druid 支持的 Hive 托管表(非外部)支持.

对于 3.0.0 版本,所有 Druid 表均为 EXTERNAL(HIVE-20085),并且任何表均支持这些语句.

从 Hive 查询 Druid

一旦使用DruidStorageHandler创建了存储在 Druid 中的第一个表,就可以对 Druid 执行查询了。

当我们在 Druid 表上表达查询时,Hive 尝试通过将尽可能多的计算推入 Druid 来“重写”该查询以有效地执行。该任务由基于Apache Calcitecost optimizer完成,该Apache Calcite标识计划中的模式并应用规则将 Importing 查询重写为新的等效查询,并希望在 Druid 中执行更多操作。

特别是,我们在HIVE-14217中实现了对优化器的扩展,它以CALCITE-1121中启动的工作为基础,并扩展了其逻辑以识别更复杂的查询模式(* timeseries 查询),将 time *维度上的过滤器转换为 Druid 间隔,将限制推送到 Druid * select *查询中,等等。

当前,我们支持识别* timeseries groupBy *和 s elect 查询。

完成优化后,需要由 Druid 执行的运算符的(子)计划将转换为有效的 Druid JSON 查询,并作为属性传递给 Hive 物理 TableScan 运算符。 Druid 查询将在 TableScan 运算符中执行,该操作将根据 Druid 查询结果生成记录。

我们使用* timeseries groupBy 的相应 Druid 查询生成单个 Hive 拆分,从中生成记录。因此,在这些情况下,并行度为 1.但是,对于没有限制的简单 select 查询(尽管它们可能仍然包含过滤器或投影),我们将原始查询划分为 x 个查询,并为每个查询生成一个拆分,从而提高了这些查询的并行度,通常会将大量结果返回到 x *。

考虑到根据查询,可能无法将任何计算推送到 Druid。但是,我们的 Contract 是查询应始终执行。因此,在这些情况下,Hive 将向 Druid 发送一个* select *查询,该查询基本上将从 Druid 中读取所有段,生成记录,然后对这些记录执行其余的 Hive 操作。如果禁用了成本优化器( *不推荐 *),这也是遵循的方法。

在 Druid 中完全执行查询

我们首先关注可以完全推入 Druid 的查询。在这些情况下,我们最终得到一个简单的计划,其中包括一个 TableScan 和一个 Fetch 运算符。因此,与启动执行容器无关的开销。

Select queries

我们从最简单的 Druid 查询类型开始:* select 查询。基本上, select *查询将等效于对数据源的扫描操作,尽管诸如投影,过滤或限制之类的操作仍可以推入此类查询中。

考虑以下查询,这是一个对表的所有列组成的 10 行的简单选择查询:

SELECT * FROM druid_table_1 LIMIT 10;

查询的 Hive 计划如下:

hive> EXPLAIN
    > SELECT * FROM druid_table_1 LIMIT 10;
OK
Plan optimized by CBO.
Stage-0
  Fetch Operator
    limit:-1
    Select Operator [SEL_1]
      Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18","_col19","_col20","_col21"]
      TableScan [TS_0]
        Output:["__time","added","channel","cityname","comment","count","countryisocode","countryname","deleted","delta","isanonymous","isminor","isnew","isrobot","isunpatrolled","metrocode","namespace","page","regionisocode","regionname","user","user_unique"],properties:{"druid.query.json":"{\"queryType\":\"select\",\"dataSource\":\"wikiticker\",\"descending\":\"false\",\"intervals\":[\"-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00\"],\"dimensions\":[\"channel\",\"cityname\",\"comment\",\"countryisocode\",\"countryname\",\"isanonymous\",\"isminor\",\"isnew\",\"isrobot\",\"isunpatrolled\",\"metrocode\",\"namespace\",\"page\",\"regionisocode\",\"regionname\",\"user\",\"user_unique\"],\"metrics\":[\"added\",\"count\",\"deleted\",\"delta\"],\"pagingSpec\":{\"threshold\":10},\"context\":{\"druid.query.fetch\":true}}","druid.query.type":"select"}
Time taken: 0.141 seconds, Fetched: 10 row(s)

观察到 Druid 查询在 TableScan 附带的属性中。为了提高可读性,我们将其正确格式化:

{
  "queryType":"select",
  "dataSource":"wikiticker",
  "descending":"false",
  "intervals":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"],
  "dimensions": 
    ["channel","cityname","comment","countryisocode",
     "countryname","isanonymous","isminor","isnew",
     "isrobot","isunpatrolled","metrocode","namespace",
     "page","regionisocode","regionname","user","user_unique"
    ],
  "metrics":["added","count","deleted","delta"],
  "pagingSpec":{"threshold":10}
}

观察到我们可以将限制推入 Druid 查询(threshold)。还要注意,由于我们没有在数据源的时间戳维度上指定过滤器,因此我们生成了一个覆盖范围(-∞,∞)的间隔。

在 Druid 中,时间戳列起着核心作用。实际上,对于所有这些查询,Druid 允许使用intervals属性对时间维度进行过滤。这一点非常重要,因为时间间隔决定了存储 Druid 数据的节点。因此,指定一个精确范围可以最大程度地减少针对某个查询的中断命中的节点数量。受德鲁伊PR-2880的启发,我们在查询的逻辑计划中实现了从过滤条件中提取间隔。例如,考虑以下查询:

SELECT `__time`
FROM druid_table_1
WHERE `__time` >= '2010-01-01 00:00:00' AND `__time` <= '2011-01-01 00:00:00'
LIMIT 10;

下面是为上面的 SQL 查询生成的 Druid 查询(我们省略了该计划,因为它是一个简单的 TableScan 运算符)。

{
  "queryType":"select",
  "dataSource":"wikiticker",
  "descending":"false",
  "intervals":["2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.001Z"],
  "dimensions":[],
  "metrics":[],
  "pagingSpec":{"threshold":10}
}

观察到我们正确推断了指定日期2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.001Z的间隔,因为在 Druid 中包括了间隔的开始日期,但没有包括结束日期。我们还支持多个间隔范围的识别,例如在以下 SQL 查询中:

SELECT `__time`
FROM druid_table_1
WHERE (`__time` BETWEEN '2010-01-01 00:00:00' AND '2011-01-01 00:00:00')
    OR (`__time` BETWEEN '2012-01-01 00:00:00' AND '2013-01-01 00:00:00')
LIMIT 10;

此外,我们也可以推断出重叠的间隔。最后,未在时间维度上指定的过滤器将转换为有效的 Druid 过滤器,并使用filter属性包含在查询中。

Timeseries queries

  • Timeseries *是 Druid 可以非常有效地执行的查询类型之一。以下 SQL 查询直接转换为 Druid * timeseries *查询:
-- GRANULARITY: MONTH
SELECT `floor_month`(`__time`), max(delta), sum(added)
FROM druid_table_1
GROUP BY `floor_month`(`__time`);

基本上,我们将给定的时间粒度分组,然后计算每个结果组的汇总结果。特别是,时间戳维度__ time上的floor_month函数表示 Druid 月粒度格式。当前,我们支持floor_yearfloor_quarterfloor_monthfloor_weekfloor_dayfloor_hourfloor_minutefloor_second粒度。此外,我们支持两种特殊类型的粒度allnone,我们将在下面进行介绍。我们计划扩展集成工作,以支持其他重要的 Druid 自定义粒度构造,例如持续时间和期间粒度

查询的 Hive 计划如下:

hive> EXPLAIN
    > SELECT `floor_month`(`__time`), max(delta), sum(added)
    > FROM druid_table_1
    > GROUP BY `floor_month`(`__time`);
OK
Plan optimized by CBO.
Stage-0
  Fetch Operator
    limit:-1
    Select Operator [SEL_1]
      Output:["_col0","_col1","_col2"]
      TableScan [TS_0]
        Output:["__time","$f1","$f2"],
        properties:{"druid.query.json":"{\"queryType\":\"timeseries\",\"dataSource\":\"wikiticker\",\"descending\":\"false\",\"granularity\":\"MONTH\",\"aggregations\":[{\"type\":\"longMax\",\"name\":\"$f1\",\"fieldName\":\"delta\"},{\"type\":\"longSum\",\"name\":\"$f2\",\"fieldName\":\"added\"}],\"intervals\":[\"-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00\"]}","druid.query.type":"timeseries"}
Time taken: 0.116 seconds, Fetched: 10 row(s)

观察到 Druid 查询在 TableScan 附带的属性中。为了提高可读性,我们将其正确格式化:

{
  "queryType":"timeseries",
  "dataSource":"wikiticker",
  "descending":"false",
  "granularity":"MONTH",
  "aggregations":[
    {"type":"longMax", "name":"$f1", "fieldName":"delta"},
    {"type":"longSum", "name":"$f2", "fieldName":"added"}
  ],
  "intervals":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"]
}

请注意,Druid 查询的粒度为MONTH

一种相当特殊的情况是all粒度,我们将在下面的示例中介绍。考虑以下查询:

-- GRANULARITY: ALL
SELECT max(delta), sum(added)
FROM druid_table_1;

由于它将对整个数据集进行汇总,因此它将转换为粒度为all的* timeseries *查询。特别是,附加到 TableScan 运算符的等效 Druid 查询如下:

{
  "queryType":"timeseries",
  "dataSource":"wikiticker",
  "descending":"false",
  "granularity":"ALL",
  "aggregations":[
    {"type":"longMax", "name":"$f1", "fieldName":"delta"},
    {"type":"longSum", "name":"$f2", "fieldName":"added"}
  ],
  "intervals":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"]
}

GroupBy queries

我们当前支持的最终查询类型为* groupBy 。这种查询比 timeseries 查询更具表现力;但是,它们的性能较差。因此,当我们不能转换成 timeseries 查询时,我们只会回到 groupBy *查询。

例如,以下 SQL 查询将生成 Druid * groupBy *查询:

SELECT max(delta), sum(added)
FROM druid_table_1
GROUP BY `channel`, `user`;
{
  "queryType":"groupBy",
  "dataSource":"wikiticker",
  "granularity":"ALL",
  "dimensions":["channel","user"],
  "aggregations":[
    {"type":"longMax","name":"$f2","fieldName":"delta"},
    {"type":"longSum","name":"$f3","fieldName":"added"}],
  "intervals":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"]
}

关于 Druid 和 Hive 的查询

最后,我们提供了一个在 Druid 和 Hive 之间运行的查询示例。特别是,让我们在 Hive 中创建第二个表,其中包含一些数据:

CREATE TABLE hive_table_1 (col1 INT, col2 STRING);
INSERT INTO hive_table_1 VALUES(1, '#en.wikipedia');

假设我们要执行以下查询:

SELECT a.channel, b.col1
FROM
(
  SELECT `channel`, max(delta) as m, sum(added)
  FROM druid_table_1
  GROUP BY `channel`, `floor_year`(`__time`)
  ORDER BY m DESC
  LIMIT 1000
) a
JOIN
(
  SELECT col1, col2
  FROM hive_table_1
) b
ON a.channel = b.col2;

该查询是对channelcol2列的简单连接。子查询a作为* groupBy *查询完全在 Druid 中执行。然后将结果与子查询b的结果合并到 Hive 中。 Tez 中的查询计划和执行如下所示:

hive> explain
    > SELECT a.channel, b.col1
    > FROM
    > (
    >   SELECT `channel`, max(delta) as m, sum(added)
    >   FROM druid_table_1
    >   GROUP BY `channel`, `floor_year`(`__time`)
    >   ORDER BY m DESC
    >   LIMIT 1000
    > ) a
    > JOIN
    > (
    >   SELECT col1, col2
    >   FROM hive_table_1
    > ) b
    > ON a.channel = b.col2;
OK
Plan optimized by CBO.
Vertex dependency in root stage
Map 2 <- Map 1 (BROADCAST_EDGE)
Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Map 2
      File Output Operator [FS_11]
        Select Operator [SEL_10] (rows=1 width=0)
          Output:["_col0","_col1"]
          Map Join Operator [MAPJOIN_16] (rows=1 width=0)
            Conds:RS_7._col0=SEL_6._col1(Inner),HybridGraceHashJoin:true,Output:["_col0","_col2"]
          <-Map 1 [BROADCAST_EDGE]
            BROADCAST [RS_7]
              PartitionCols:_col0
              Filter Operator [FIL_2] (rows=1 width=0)
                predicate:_col0 is not null
                Select Operator [SEL_1] (rows=1 width=0)
                  Output:["_col0"]
                  TableScan [TS_0] (rows=1 width=0)
                    druid@druid_table_1,druid_table_1,Tbl:PARTIAL,Col:NONE,Output:["channel"],properties:{"druid.query.json":"{\"queryType\":\"groupBy\",\"dataSource\":\"wikiticker\",\"granularity\":\"all\",\"dimensions\":[{\"type\":\"default\",\"dimension\":\"channel\"},{\"type\":\"extraction\",\"dimension\":\"__time\",\"outputName\":\"floor_year\",\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'\",\"granularity\":\"year\",\"timeZone\":\"UTC\",\"locale\":\"en-US\"}}],\"limitSpec\":{\"type\":\"default\",\"limit\":1000,\"columns\":[{\"dimension\":\"$f2\",\"direction\":\"descending\",\"dimensionOrder\":\"numeric\"}]},\"aggregations\":[{\"type\":\"doubleMax\",\"name\":\"$f2\",\"fieldName\":\"delta\"},{\"type\":\"doubleSum\",\"name\":\"$f3\",\"fieldName\":\"added\"}],\"intervals\":[\"1900-01-01T00:00:00.000/3000-01-01T00:00:00.000\"]}","druid.query.type":"groupBy"}
          <-Select Operator [SEL_6] (rows=1 width=15)
              Output:["_col0","_col1"]
              Filter Operator [FIL_15] (rows=1 width=15)
                predicate:col2 is not null
                TableScan [TS_4] (rows=1 width=15)
                  druid@hive_table_1,hive_table_1,Tbl:COMPLETE,Col:NONE,Output:["col1","col2"]
Time taken: 0.924 seconds, Fetched: 31 row(s)
hive> SELECT a.channel, b.col1
    > FROM
    > (
    >   SELECT `channel`, max(delta) as m, sum(added)
    >   FROM druid_table_1
    >   GROUP BY `channel`, `floor_year`(`__time`)
    >   ORDER BY m DESC
    >   LIMIT 1000
    > ) a
    > JOIN
    > (
    >   SELECT col1, col2
    >   FROM hive_table_1
    > ) b
    > ON a.channel = b.col2;
Query ID = user1_20160818202329_e9a8b3e8-18d3-49c7-bfe0-99d38d2402d3
Total jobs = 1
Launching Job 1 out of 1
2016-08-18 20:23:30 Running Dag: dag_1471548210492_0001_1
2016-08-18 20:23:30 Starting to run new task attempt: attempt_1471548210492_0001_1_00_000000_0
Status: Running (Executing on YARN cluster with App id application_1471548210492_0001)
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
Map 2 .......... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 0.15 s
----------------------------------------------------------------------------------------------
2016-08-18 20:23:31 Completed running task attempt: attempt_1471548210492_0001_1_00_000000_0
OK
#en.wikipedia	1
Time taken: 1.835 seconds, Fetched: 2 row(s)

未解决问题(JIRA)

||
||
|Key|Summary|T|Created|Updated|Due|Assignee|Reporter|P|Status|Resolution|

Loading...

Refresh