|
1 | 1 | --- |
2 | | -title: 使用任务自动化数据加载 |
3 | | -sidebar_label: 任务 |
| 2 | +title: 使用任务(Task)自动化数据加载 |
| 3 | +sidebar_label: 任务(Task) |
4 | 4 | --- |
5 | 5 |
|
6 | | -任务封装了特定的 SQL 语句,设计用于按预定间隔执行、由特定事件触发或作为任务序列的一部分执行。在 Databend Cloud 中,任务通常用于定期捕获流中的变更数据(例如新增记录),然后将这些数据同步到指定目标。此外,任务还支持 [Webhook](https://en.wikipedia.org/wiki/Webhook) 等消息系统,可按需发送错误消息和通知。 |
| 6 | +Task 是“把 SQL 交给 Databend 代跑”的方式。你可以让它按固定频率运行、在另一任务结束后运行,或者在某个 Stream 报告有增量时再运行。下面先看定义 Task 时需要关注的几个开关,再通过两个动手示例理解它如何和 Stream 配合。 |
7 | 7 |
|
8 | | -## 创建任务 |
| 8 | +## Task 构建要素 |
9 | 9 |
|
10 | | -本主题详细说明在 Databend Cloud 中创建任务的步骤。您可以使用 [CREATE TASK](/sql/sql-commands/ddl/task/ddl-create_task) 命令创建任务。创建任务时,请参照下图设计工作流: |
| 10 | +- **名称与计算仓库** – 每个 Task 都需要一个 Warehouse。 |
| 11 | + ```sql |
| 12 | + CREATE TASK ingest_orders |
| 13 | + WAREHOUSE = 'etl_wh' |
| 14 | + AS SELECT 1; |
| 15 | + ``` |
| 16 | +- **触发方式** – `SCHEDULE = 2 MINUTE`、CRON,或 `AFTER <task>`(适用于 DAG)。 |
| 17 | +- **执行条件** – `WHEN STREAM_STATUS('mystream') = TRUE` 这类布尔表达式,只有条件满足才运行。 |
| 18 | +- **错误策略** – `SUSPEND_TASK_AFTER_NUM_FAILURES`、`ERROR_INTEGRATION` 等参数可在失败多次后暂停并发通知。 |
| 19 | +- **SQL 负载** – `AS` 后的内容就是 Task 要执行的语句,可以是一条 INSERT/COPY/MERGE,也可以是 BEGIN...END。 |
11 | 20 |
|
12 | | - |
| 21 | +## 示例 1:定时 COPY |
13 | 22 |
|
14 | | -1. 为任务设置名称。 |
15 | | -2. 指定运行任务的计算集群。如需创建计算集群,请参阅 [使用计算集群](/guides/cloud/using-databend-cloud/warehouses)。 |
16 | | -3. 确定任务触发方式。 |
| 23 | +持续生成 Parquet 并导入表。记得把 `'etl_wh_small'` 换成你自己的 Warehouse。 |
17 | 24 |
|
18 | | - - 可通过指定分钟或秒为间隔来调度任务,或使用 CRON 表达式配合可选时区实现更精确的调度。 |
| 25 | +### 步骤 1: 准备演示对象 |
19 | 26 |
|
20 | | -```sql title='示例:' |
21 | | --- 此任务每 2 分钟运行一次 |
22 | | -CREATE TASK mytask |
23 | | -WAREHOUSE = 'default' |
24 | | -// highlight-next-line |
25 | | -SCHEDULE = 2 MINUTE |
26 | | -AS ... |
| 27 | +```sql |
| 28 | +CREATE DATABASE IF NOT EXISTS task_demo; |
| 29 | +USE task_demo; |
| 30 | +
|
| 31 | +CREATE OR REPLACE TABLE sensor_events ( |
| 32 | + event_time TIMESTAMP, |
| 33 | + sensor_id INT, |
| 34 | + temperature DOUBLE, |
| 35 | + humidity DOUBLE |
| 36 | +); |
27 | 37 |
|
28 | | --- 此任务每天午夜(东京时间)在 Asia/Tokyo 时区运行 |
29 | | -CREATE TASK mytask |
30 | | -WAREHOUSE = 'default' |
31 | | -// highlight-next-line |
32 | | -SCHEDULE = USING CRON '0 0 0 * * *' 'Asia/Tokyo' |
33 | | -AS ... |
| 38 | +CREATE OR REPLACE STAGE sensor_events_stage; |
34 | 39 | ``` |
35 | 40 |
|
36 | | - - 或者,您可以在任务间建立依赖关系,将任务设置为 [有向无环图](https://en.wikipedia.org/wiki/Directed_acyclic_graph) 中的子任务。 |
| 41 | +### 步骤 2: Task 1 —— 生成文件 |
37 | 42 |
|
38 | | -```sql title='示例:' |
39 | | --- 此任务依赖于 DAG 中 'task_root' 任务的完成 |
40 | | -CREATE TASK mytask |
41 | | -WAREHOUSE = 'default' |
42 | | -// highlight-next-line |
43 | | -AFTER task_root |
44 | | -AS ... |
| 43 | +```sql |
| 44 | +CREATE OR REPLACE TASK task_generate_data |
| 45 | + WAREHOUSE = 'etl_wh_small' |
| 46 | + SCHEDULE = 1 MINUTE |
| 47 | +AS |
| 48 | +COPY INTO @sensor_events_stage |
| 49 | +FROM ( |
| 50 | + SELECT |
| 51 | + NOW() AS event_time, |
| 52 | + number AS sensor_id, |
| 53 | + 20 + RAND() * 5 AS temperature, |
| 54 | + 60 + RAND() * 10 AS humidity |
| 55 | + FROM numbers(100) |
| 56 | +) |
| 57 | +FILE_FORMAT = (TYPE = PARQUET); |
45 | 58 | ``` |
46 | 59 |
|
47 | | -4. 指定任务执行条件,允许基于布尔表达式控制任务执行(可选)。 |
| 60 | +### 步骤 3: Task 2 —— 将文件导入表 |
48 | 61 |
|
49 | | -```sql title='示例:' |
50 | | --- 此任务每 2 分钟运行一次,仅当 'mystream' 包含数据变更时执行 AS 后的 SQL |
51 | | -CREATE TASK mytask |
52 | | -WAREHOUSE = 'default' |
53 | | -SCHEDULE = 2 MINUTE |
54 | | -// highlight-next-line |
55 | | -WHEN STREAM_STATUS('mystream') = TRUE |
56 | | -AS ... |
| 62 | +```sql |
| 63 | +CREATE OR REPLACE TASK task_consume_data |
| 64 | + WAREHOUSE = 'etl_wh_small' |
| 65 | + SCHEDULE = 1 MINUTE |
| 66 | +AS |
| 67 | +COPY INTO sensor_events |
| 68 | +FROM @sensor_events_stage |
| 69 | +PATTERN = '.*[.]parquet' |
| 70 | +FILE_FORMAT = (TYPE = PARQUET) |
| 71 | +PURGE = TRUE; |
57 | 72 | ``` |
58 | 73 |
|
59 | | -5. 指定任务出错时的处理方式,包括设置连续失败次数以暂停任务,以及指定错误通知的集成方式。有关设置错误通知的更多信息,请参阅 [配置通知集成](#configuring-notification-integrations)。 |
60 | | - |
61 | | -```sql title='示例:' |
62 | | --- 此任务将在连续失败 3 次后暂停 |
63 | | -CREATE TASK mytask |
64 | | -WAREHOUSE = 'default' |
65 | | -// highlight-next-line |
66 | | -SUSPEND_TASK_AFTER_NUM_FAILURES = 3 |
67 | | -AS ... |
68 | | - |
69 | | --- 此任务将使用 'my_webhook' 集成发送错误通知 |
70 | | -CREATE TASK mytask |
71 | | -WAREHOUSE = 'default' |
72 | | -// highlight-next-line |
73 | | -ERROR_INTEGRATION = 'my_webhook' |
74 | | -AS ... |
| 74 | +### 步骤 4: 恢复 Task |
| 75 | + |
| 76 | +```sql |
| 77 | +ALTER TASK task_generate_data RESUME; |
| 78 | +ALTER TASK task_consume_data RESUME; |
75 | 79 | ``` |
76 | 80 |
|
77 | | -6. 指定任务将执行的 SQL 语句。 |
| 81 | +### 步骤 5: 观察运行情况 |
78 | 82 |
|
79 | | -```sql title='示例:' |
80 | | --- 此任务每年更新 'employees' 表中的 'age' 列,使其递增 1 |
81 | | -CREATE TASK mytask |
82 | | -WAREHOUSE = 'default' |
83 | | -SCHEDULE = USING CRON '0 0 1 1 * *' 'UTC' |
84 | | -// highlight-next-line |
85 | | -AS |
86 | | -UPDATE employees |
87 | | -SET age = age + 1; |
| 83 | +```sql |
| 84 | +SHOW TASKS LIKE 'task_%'; |
| 85 | +LIST @sensor_events_stage; |
| 86 | +SELECT * FROM sensor_events ORDER BY event_time DESC LIMIT 5; |
| 87 | +SELECT * FROM task_history('task_consume_data', 5); |
88 | 88 | ``` |
89 | 89 |
|
90 | | -## 查看已创建的任务 |
91 | | - |
92 | | -要查看组织创建的所有任务,请登录 Databend Cloud 并转到 **数据** > **任务**。您可以查看每个任务的详细信息,包括状态和调度计划。 |
93 | | - |
94 | | -要查看任务运行历史记录,请转到 **监控** > **任务历史**。您可以查看每次任务运行的结果、完成时间等详细信息。 |
95 | | - |
96 | | -## 配置通知集成 |
97 | | - |
98 | | -Databend Cloud 允许您为任务配置错误通知,在任务执行出错时自动发送通知。当前支持 Webhook 集成,可实现错误事件与外部系统或服务的实时通信。 |
99 | | - |
100 | | -### 任务错误负载 |
101 | | - |
102 | | -任务错误负载指任务执行出错时作为错误通知发送的数据或信息。该负载通常包含错误详情,如错误代码、错误消息、时间戳以及其他有助于诊断和解决问题的上下文信息。 |
103 | | - |
104 | | -```json title='任务错误负载示例:' |
105 | | -{ |
106 | | - "version": "1.0", |
107 | | - "messageId": "063e40ab-0b55-439e-9cd2-504c496e1566", |
108 | | - "messageType": "TASK_FAILED", |
109 | | - "timestamp": "2024-03-19T02:37:21.160705788Z", |
110 | | - "tenantId": "tn78p61xz", |
111 | | - "taskName": "my_task", |
112 | | - "taskId": "15", |
113 | | - "rootTaskName": "my_task", |
114 | | - "rootTaskId": "15", |
115 | | - "messages": [ |
116 | | - { |
117 | | - "runId": "unknown", |
118 | | - "scheduledTime": "2024-03-19T02:37:21.157169855Z", |
119 | | - "queryStartTime": "2024-03-19T02:37:21.043090475Z", |
120 | | - "completedTime": "2024-03-19T02:37:21.157169205Z", |
121 | | - "queryId": "88bb9d5d-5d5e-4e52-92cc-b1953406245a", |
122 | | - "errorKind": "UnexpectedError", |
123 | | - "errorCode": "500", |
124 | | - "errorMessage": "query sync failed: All attempts fail:\n#1: query error: code: 1006, message: divided by zero while evaluating function `divide(1, 0)`" |
125 | | - } |
126 | | - ] |
127 | | -} |
| 90 | +### 步骤 6: 调整或改写 Task |
| 91 | + |
| 92 | +```sql |
| 93 | +ALTER TASK task_consume_data |
| 94 | + SET SCHEDULE = 30 SECOND, |
| 95 | + WAREHOUSE = 'etl_wh_medium'; |
| 96 | +
|
| 97 | +ALTER TASK task_consume_data |
| 98 | + MODIFY AS |
| 99 | +COPY INTO sensor_events |
| 100 | +FROM @sensor_events_stage |
| 101 | +FILE_FORMAT = (TYPE = PARQUET); |
| 102 | +
|
| 103 | +ALTER TASK task_consume_data RESUME; |
| 104 | +
|
| 105 | +SELECT * |
| 106 | +FROM task_history('task_consume_data', 5) |
| 107 | +ORDER BY completed_time DESC; |
128 | 108 | ``` |
129 | 109 |
|
130 | | -### 使用示例 |
| 110 | +## 示例 2:Stream 条件 Task |
131 | 111 |
|
132 | | -在为任务配置错误通知前,您需要使用 [CREATE NOTIFICATION INTEGRATION](/sql/sql-commands/ddl/notification/ddl-create-notification) 命令创建通知集成。以下示例展示了如何为任务创建和配置通知集成。该示例使用 [Webhook.site](http://webhook.site) 模拟消息系统,接收来自 Databend Cloud 的负载。 |
| 112 | +只有当 Stream 报告“有增量”时才运行,避免空跑。 |
133 | 113 |
|
134 | | -1. 在浏览器中打开 [Webhook.site](http://webhook.site),获取您的 Webhook URL。 |
| 114 | +### 步骤 1: 创建 Stream 与结果表 |
135 | 115 |
|
136 | | - |
| 116 | +```sql |
| 117 | +CREATE OR REPLACE STREAM sensor_events_stream |
| 118 | + ON TABLE sensor_events |
| 119 | + APPEND_ONLY = false; |
| 120 | +
|
| 121 | +CREATE OR REPLACE TABLE sensor_events_latest AS |
| 122 | +SELECT * |
| 123 | +FROM sensor_events |
| 124 | +WHERE 1 = 0; |
| 125 | +``` |
137 | 126 |
|
138 | | -2. 在 Databend Cloud 中创建通知集成,然后创建带通知集成的任务: |
| 127 | +### 步骤 2: 定义条件 Task |
139 | 128 |
|
140 | 129 | ```sql |
141 | | --- 创建名为 'my_task' 的任务,每分钟运行一次,错误通知发送至 'my_webhook' |
142 | | --- 故意除以零以生成错误 |
143 | | -CREATE TASK my_task |
144 | | -WAREHOUSE = 'default' |
145 | | -SCHEDULE = 1 MINUTE |
146 | | -ERROR_INTEGRATION = 'my_webhook' |
| 130 | +CREATE OR REPLACE TASK task_stream_merge |
| 131 | + WAREHOUSE = 'etl_wh_small' |
| 132 | + SCHEDULE = 1 MINUTE |
| 133 | + WHEN STREAM_STATUS('task_demo.sensor_events_stream') = TRUE |
147 | 134 | AS |
148 | | -SELECT 1 / 0; |
149 | | - |
150 | | --- 创建名为 'my_webhook' 的通知集成,用于发送 webhook 通知 |
151 | | -CREATE NOTIFICATION INTEGRATION my_webhook |
152 | | -TYPE = WEBHOOK |
153 | | -ENABLED = TRUE |
154 | | -WEBHOOK = ( |
155 | | - url = '<YOUR-WEBHOOK_URL>', |
156 | | - method = 'POST' |
157 | | -); |
| 135 | +INSERT INTO sensor_events_latest |
| 136 | +SELECT * |
| 137 | +FROM sensor_events_stream; |
158 | 138 |
|
159 | | --- 创建后恢复任务 |
160 | | -ALTER TASK my_task RESUME; |
| 139 | +ALTER TASK task_stream_merge RESUME; |
161 | 140 | ``` |
162 | 141 |
|
163 | | -3. 稍等片刻,您将看到 webhook 开始接收来自创建的任务的负载。 |
| 142 | +### 步骤 3: 查看增量与历史 |
164 | 143 |
|
165 | | - |
| 144 | +```sql |
| 145 | +SELECT * |
| 146 | +FROM sensor_events_latest |
| 147 | +ORDER BY event_time DESC |
| 148 | +LIMIT 5; |
166 | 149 |
|
167 | | -## 使用示例 |
| 150 | +SELECT * |
| 151 | +FROM task_history('task_stream_merge', 5); |
| 152 | +``` |
168 | 153 |
|
169 | | -完整演示如何通过流捕获数据变更并使用任务同步,请参阅 [示例:实时跟踪和转换数据](01-stream.md#example-tracking-and-transforming-data-in-real-time)。 |
| 154 | +只要 `STREAM_STATUS('<database>.<stream_name>')` 返回 TRUE(例如 `task_demo.sensor_events_stream`),Task 就会运行;否则保持暂停,直到下一批增量到达。 |
0 commit comments