维护计划的查询

Intro

定期执行语句可能对

Overview

Note

计划的查询已在 Hive 4.0(HIVE-21884)中添加

Hive 在语言本身中内置了计划查询界面,以便于访问:

创建计划的查询语法

创建 sched 查询 <scheduled_query_name>
<scheduleSpecification>
[<executedAsSpec> ]
[<enableSpecification>]
<definedAsSpec>

更改 sched 的查询语法

更改计划的查询 <scheduled_query_name> (<scheduleSpec> | <executedAsSpec> | <enableSpecification> | <definedAsSpec> | <executeSpec>);

Drop syntax

计划的查询 <scheduled_query_name>;

scheduleSpecification syntax

时间表可以使用 CRON 表达式指定,或者在常见情况下可以使用更简单的形式;在任何情况下,计划都以 Quartz cron 表达式存储。

基于 CRON 的日程表语法

CRON <quartz_schedule_expression>

其中 quartz_schedule_expression 以 Quartz 格式引用了进度表

https://www.freeformatter.com/cron-expression-generator-quartz.html

例如,CRON '0 */10 * * * ? *'表达式将每 10 分钟触发一次。

每个基于计划的语法

为了提供一种更具可读性的声明时间表,可以使用每个。

[(偏移量| AT) <timeOrDate>]

该格式使以更易读的方式声明时间表成为可能:

每 2 分钟
每小时(0:07:30)
每天'11:35:30'

ExecutedAs syntax

执行为 <user_name>

默认情况下,调度查询以声明用户的身份执行;但是具有 Management 员权限的人可能可以更改执行用户。

enableSpecification syntax

(启用[D] |禁用[D])

可用于启用/禁用时间表。

Note

对于 CREATE SCHEDULED QUERY 语句,默认行为由配置键 hive.scheduled.queries.create.as.enabled 设置。

Note

如果在禁用相应的计划时有正在进行的计划执行,则已经运行的执行仍将完成。但是不会再触发任何执行。

定义的 AS 语法

[已定义] AS

“查询”是要计划执行的单个语句表达式。

executeSpec syntax

EXECUTE

将计划的下一个执行时间更改为现在。在调试/开发过程中可能很有用。

System tables/views

可以使用* information_schema sysdb *获取有关计划的查询/执行的信息-推荐的方法是使用 information_schema。 * sysdb 是用于构建 information_schema *级别视图的表-用于调试。

information_schema.scheduled_queries

假设我们有一个 sched 的查询,定义为:

**创建计划的查询 sc1 cron'0 */10 * * '作为选择 1;

让我们在 information_schema.scheduled_queries 表中通过使用

从 information_schema.scheduled_queries 中选择*

我将转置结果集以描述每一列

scheduled_query_id 1 在内部,每个计划查询也都有一个数字 ID
schedule_name sc1 时间表名称
enabled true 如果启用了计划,则为 True
cluster_namespace hive 命名空间的计划查询属于
schedule 0 */10 * * * ? * 以 QUARTZ cron 格式描述的时间表
user dev 查询的所有者/执行者
query select 1 计划查询
next_execution 2020-01-29 16:50:00 技术专栏;显示何时应该执行下一次执行

Note

(schedule_name,cluster_namespace) 是唯一的

information_schema.scheduled_executions

该视图可用于获取有关最近计划的查询执行的信息。

从 information_schema.scheduled_executions 中选择;*

此视图中的一条记录具有以下信息:

scheduled_execution_id 13 每个计划的查询执行都有一个唯一的数字 ID
schedule_name sc1 该执行所属的日程表名称
executor_query_id dev_20200131103008_c9a39b8d-e26b-44cd-b8ae-9d054204dc07 执行引擎为给定的计划执行分配的查询 ID
state FINISHED 执行状态;可
start_time 2020-01-31 10:30:06 执行开始时间
end_time 2020-01-31 10:30:08 执行结束时间
elapsed 2 (computed) end_time-start_time
error_message NULL 如果查询失败,则会在此处显示错误消息
last_update_time NULL 在执行过程中,执行程序最后一次更新时间提供了有关状态的信息

Execution states

INITED 计划的执行记录是在分配执行者以运行它时创建的;


保留 INITED 状态,直到来自执行程序的第一次更新进入。
| EXECUTING |执行状态中的查询正在由执行者处理;在此阶段,执行程序按以下时间间隔报告查询进度:hive.scheduled.queries.executor.progress.report.interval ** |
| FAILED |设置了此状态时,由于错误代码(或异常)而停止了查询执行,并且还填充了 error_message
| 完成 |查询已成功完成,没有问题|
| TIMED_OUT |如果执行的执行时间超过 metastore.scheduled.queries.execution.timeout ,则该执行被视为超时。
计划的查询维护任务将检查是否有超时执行。

How long are execution informations are retained?

计划的查询维护任务将删除较早的 metastore.scheduled.queries.execution.max.age 条目。

Configuration

Hive Metastore 相关配置

HiveServer2 相关配置

Examples

示例 1 –使用计划的基本示例

create table t (a integer);

-- create a scheduled query; every 10 minute insert a new row
create scheduled query sc1 cron '0 */10 * * * ? *' as insert into t values (1);
-- depending on hive.scheduled.queries.create.as.enabled the query might get create in disabled mode
-- it can be enabled using:
alter scheduled query sc1 enabled;

-- inspect scheduled queries using the information_schema
select * from information_schema.scheduled_queries s where schedule_name='sc1';
+-----------------------+------------------+------------+----------------------+-------------------+---------+-----------+----------------------+
| s.scheduled_query_id  | s.schedule_name  | s.enabled  | s.cluster_namespace  |    s.schedule     | s.user  |  s.query  |   s.next_execution   |
+-----------------------+------------------+------------+----------------------+-------------------+---------+-----------+----------------------+
| 1                     | sc1              | true       | hive                 | 0 */10 * * * ? *  | dev     | select 1  | 2020-02-03 15:10:00  |
+-----------------------+------------------+------------+----------------------+-------------------+---------+-----------+----------------------+

-- wait 10 minutes or execute by issuing:
alter scheduled query sc1 execute;

select * from information_schema.scheduled_executions s where schedule_name='sc1' order by scheduled_execution_id desc limit 1;
+---------------------------+------------------+----------------------------------------------------+-----------+----------------------+----------------------+------------+------------------+---------------------+
| s.scheduled_execution_id  | s.schedule_name  |                s.executor_query_id                 |  s.state  |     s.start_time     |      s.end_time      | s.elapsed  | s.error_message  | s.last_update_time  |
+---------------------------+------------------+----------------------------------------------------+-----------+----------------------+----------------------+------------+------------------+---------------------+
| 496                       | sc1              | dev_20200203152025_bdf3deac-0ca6-407f-b122-c637e50f99c8 | FINISHED  | 2020-02-03 15:20:23  | 2020-02-03 15:20:31  | 8          | NULL             | NULL                |
+---------------------------+------------------+----------------------------------------------------+-----------+----------------------+----------------------+------------+------------------+---------------------+

示例 2 –定期分析外部表

假设您有一个外部表-它的内容正在缓慢变化...这最终将导致 Hive 在计划时间内使用过时的统计信息

-- create external table
create external table t (a integer);

-- see where the table lives:
desc formatted t;
[...]
| Location:                     | file:/data/hive/warehouse/t                       | NULL                                               |
[...]

-- in a terminal; load some data into the table directory:
seq 1 10 > /data/hive/warehouse/t/f1

-- back in hive you will see that 
select count(1) from t;
10
-- meanwhile basic stats show that the table has "0" rows 
desc formatted t;
[...]
|                               | numRows                                            | 0                                                  |
[...]

create scheduled query t_analyze cron '0 */1 * * * ? *' as analyze table t compute statistics for columns;

-- wait some time or execute by issuing:
alter scheduled query t_analyze execute;

select * from information_schema.scheduled_executions s where schedule_name='ex_analyze' order by scheduled_execution_id desc limit 3;
+---------------------------+------------------+----------------------------------------------------+------------+----------------------+----------------------+------------+------------------+----------------------+
| s.scheduled_execution_id  | s.schedule_name  |                s.executor_query_id                 |  s.state   |     s.start_time     |      s.end_time      | s.elapsed  | s.error_message  |  s.last_update_time  |
+---------------------------+------------------+----------------------------------------------------+------------+----------------------+----------------------+------------+------------------+----------------------+
| 498                       | t_analyze       | dev_20200203152640_a59bc198-3ed3-4ef2-8f63-573607c9914e | FINISHED   | 2020-02-03 15:26:38  | 2020-02-03 15:28:01  | 83         | NULL             | NULL                 |
+---------------------------+------------------+----------------------------------------------------+------------+----------------------+----------------------+------------+------------------+----------------------+

-- and the numrows have been updated
desc formatted t;
[...]
|                               | numRows                                            | 10                                                 |
[...]

-- we don't want this running every minute anymore...
alter scheduled query t_analyze disable;

示例 3 –重建实例化视图

-- some settings...they might be there already
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.strict.checks.cartesian.product=false;
set hive.stats.fetch.column.stats=true;
set hive.materializedview.rewriting=true;

-- create some tables
CREATE TABLE emps (
  empid INT,
  deptno INT,
  name VARCHAR(256),
  salary FLOAT,
  hire_date TIMESTAMP)
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
 
CREATE TABLE depts (
  deptno INT,
  deptname VARCHAR(256),
  locationid INT)
STORED AS ORC
TBLPROPERTIES ('transactional'='true');

-- load data
insert into emps values (100, 10, 'Bill', 10000, 1000), (200, 20, 'Eric', 8000, 500),
  (150, 10, 'Sebastian', 7000, null), (110, 10, 'Theodore', 10000, 250), (120, 10, 'Bill', 10000, 250),
  (1330, 10, 'Bill', 10000, '2020-01-02');
insert into depts values (10, 'Sales', 10), (30, 'Marketing', null), (20, 'HR', 20);

insert into emps values (1330, 10, 'Bill', 10000, '2020-01-02');

-- create mv
CREATE MATERIALIZED VIEW mv1 AS
  SELECT empid, deptname, hire_date FROM emps
    JOIN depts ON (emps.deptno = depts.deptno)
    WHERE hire_date >= '2016-01-01 00:00:00';

EXPLAIN
SELECT empid, deptname FROM emps
JOIN depts ON (emps.deptno = depts.deptno)
WHERE hire_date >= '2018-01-01';

-- create a schedule to rebuild mv
create scheduled query mv_rebuild cron '0 */10 * * * ? *' defined as 
  alter materialized view mv1 rebuild;

-- from this expalin it will be seen that the mv1 is being used
EXPLAIN
SELECT empid, deptname FROM emps
JOIN depts ON (emps.deptno = depts.deptno)
WHERE hire_date >= '2018-01-01';

-- insert a new record
insert into emps values (1330, 10, 'Bill', 10000, '2020-01-02');

-- the source tables are scanned
EXPLAIN
SELECT empid, deptname FROM emps
JOIN depts ON (emps.deptno = depts.deptno)
WHERE hire_date >= '2018-01-01';

-- wait 10 minutes or execute
alter scheduled query mv_rebuild execute;

-- run it again...the view should be rebuilt
EXPLAIN
SELECT empid, deptname FROM emps
JOIN depts ON (emps.deptno = depts.deptno)
WHERE hire_date >= '2018-01-01';

示例 4 –摄取

drop table if exists t;
drop table if exists s;

-- suppose that this table is an external table or something
-- which supports the pushdown of filter condition on the id column
create table s(id integer, cnt integer);

-- create an internal table and an offset table
create table t(id integer, cnt integer);
create table t_offset(offset integer);
insert into t_offset values(0);

-- pretend that data is added to s
insert into s values(1,1);

-- run an ingestion...
from (select id==offset as first,* from s
join t_offset on id>=offset) s1
insert into t select id,cnt where first = false
insert overwrite table t_offset select max(s1.id);

-- configure to run ingestion every 10 minutes
create scheduled query ingest every 10 minutes defined as
from (select id==offset as first,* from s
join t_offset on id>=offset) s1
insert into t select id,cnt where first = false
insert overwrite table t_offset select max(s1.id);

-- add some new values
insert into s values(2,2),(3,3);

-- pretend that a timeout have happened
alter scheduled query ingest execute;
首页