例行导入
使用 Routine Load 加载数据
在此 快速入门 中尝试 Routine Load
本主题介绍如何创建 Routine Load 作业,将 Kafka 消息(事件)流式传输到 StarRocks 中,并帮助您熟悉 Routine Load 的一些基本概念。
如需将数据流的消息持续导入 StarRocks,您可以将消息流存储在 Kafka Topic 中,然后创建例行导入作业以消费消息。例行导入作业在 StarRocks 中持久存在,生成一系列导入任务以消费 Topic 中全部或部分分区中的消息,并将消息加载到 StarRocks 中。
例行导入作业支持精确一次 (exactly-once) 语义,以确保加载到 StarRocks 中的数据既不丢失也不重复。
例行导入支持在数据加载时进行数据转换,并支持在数据加载期间通过 UPSERT 和 DELETE 操作进行数据更改。有关更多信息,请参见加载时转换数据和通过加载更改数据。
您只有以具有 StarRocks 表 INSERT 权限的用户身份才能将数据加载到 StarRocks 表中。 如果您没有 INSERT 权限,请按照GRANT中提供的说明,将 INSERT 权限授予您用于连接 StarRocks 集群的用户。 语法为 GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}
。
支持的数据格式
例行导入现在支持消费来自 Kafka 集群的 CSV、JSON 和 Avro(自 v3.0.1 起支持)格式的数据。
注意
对于 CSV 数据,请注意以下几点
- 您可以使用 UTF-8 字符串(例如逗号 (,)、制表符或管道 (|))作为文本分隔符,其长度不超过 50 字节。
- 空值用
\N
表示。例如,一个数据文件包含三列,并且该数据文件中的一条记录在第一列和第三列中保存数据,但在第二列中没有数据。在这种情况下,您需要在第二列中使用\N
来表示空值。这意味着该记录必须编译为a,\N,b
而不是a,,b
。a,,b
表示记录的第二列包含一个空字符串。
基本概念
术语
-
导入作业
例行导入作业是一个长时间运行的作业。只要其状态为 RUNNING,导入作业就会持续生成一个或多个并发导入任务,这些任务会消费 Kafka 集群 Topic 中的消息并将数据加载到 StarRocks 中。
-
导入任务
导入作业按照特定规则拆分为多个导入任务。导入任务是数据加载的基本单元。作为一个独立的事件,导入任务基于Stream Load实现导入机制。多个导入任务并发地消费来自 Topic 不同分区的消息,并将数据加载到 StarRocks 中。
工作流程
-
创建例行导入作业。 要从 Kafka 加载数据,您需要通过运行CREATE ROUTINE LOAD语句来创建例行导入作业。 FE 解析该语句,并根据您指定的属性创建作业。
-
FE 将作业拆分为多个导入任务。
FE 根据特定规则将作业拆分为多个导入任务。 每个导入任务都是一个独立的事务。 拆分规则如下:
- FE 根据所需的并发数
desired_concurrent_number
、Kafka Topic 中的分区数以及处于活动状态的 BE 节点数计算导入任务的实际并发数。 - FE 基于计算出的实际并发数将作业拆分为导入任务,并将任务排列在任务队列中。
每个 Kafka Topic 由多个分区组成。 Topic 分区和导入任务之间的关系如下:
- 一个分区唯一分配给一个导入任务,并且来自该分区的所有消息都由该导入任务消费。
- 一个导入任务可以消费来自一个或多个分区的消息。
- 所有分区均匀地分布在导入任务之间。
- FE 根据所需的并发数
-
多个导入任务并发运行以消费来自多个 Kafka Topic 分区的消息,并将数据加载到 StarRocks 中
-
FE 调度和提交导入任务:FE 定期调度队列中的导入任务,并将它们分配给选定的 Coordinator BE 节点。 导入任务之间的间隔由配置项
max_batch_interval
定义。 FE 将导入任务均匀地分配给所有 BE 节点。 有关max_batch_interval
的更多信息,请参见CREATE ROUTINE LOAD。 -
Coordinator BE 启动导入任务,消费分区中的消息,解析和过滤数据。 导入任务持续到消费了预定义数量的消息或达到预定义的时间限制为止。 消息批量大小和时间限制在 FE 配置
max_routine_load_batch_size
和routine_load_task_consume_second
中定义。 有关详细信息,请参见FE 配置。 然后,Coordinator BE 将消息分发给 Executor BE。 Executor BE 将消息写入磁盘。注意
StarRocks 支持通过包括 SASL_SSL、SASL_PLAINTEXT、SSL 和 PLAINTEXT 在内的安全协议访问 Kafka。 本主题使用通过 PLAINTEXT 连接到 Kafka 作为示例。 如果您需要通过其他安全协议连接到 Kafka,请参见CREATE ROUTINE LOAD。
-
-
FE 生成新的导入任务以持续加载数据。 在 Executor BE 将数据写入磁盘后,Coordinator BE 会将导入任务的结果报告给 FE。 基于结果,FE 会生成新的导入任务以持续加载数据。 或者,FE 会重试失败的任务,以确保加载到 StarRocks 中的数据既不丢失也不重复。
创建 Routine Load 作业
以下三个示例描述了如何消费 Kafka 中的 CSV 格式、JSON 格式和 Avro 格式的数据,并通过创建例行导入作业将数据加载到 StarRocks 中。 有关详细的语法和参数描述,请参见CREATE ROUTINE LOAD。
加载 CSV 格式的数据
本节介绍如何创建例行导入作业以消费 Kafka 集群中的 CSV 格式数据,并将数据加载到 StarRocks 中。
准备数据集
假设 Kafka 集群中 Topic ordertest1
中存在 CSV 格式的数据集。 数据集中的每条消息都包含六个字段:订单 ID、付款日期、客户姓名、国籍、性别和价格。
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina",Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924
创建表
根据 CSV 格式数据的字段,在数据库 example_db
中创建表 example_tbl1
。 以下示例创建一个包含 5 个字段的表,不包括 CSV 格式数据中的客户性别字段。
CREATE TABLE example_db.example_tbl1 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`price`double NULL COMMENT "Price"
)
ENGINE=OLAP
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(`order_id`);
注意
自 v2.5.7 起,StarRocks 可以在您创建表或添加分区时自动设置桶数 (BUCKETS)。 您不再需要手动设置桶数。 有关详细信息,请参见设置桶数。
提交例行导入作业
执行以下语句以提交名为 example_tbl1_ordertest1
的例行导入作业,以消费 Topic ordertest1
中的消息并将数据加载到表 example_tbl1
中。 导入任务消费来自 Topic 指定分区中初始偏移量的消息。
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" = "0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
提交导入作业后,您可以执行SHOW ROUTINE LOAD语句以检查导入作业的状态。
-
导入作业名称
一个表上可能存在多个导入作业。 因此,我们建议您使用相应的 Kafka Topic 和提交导入作业的时间来命名导入作业。 这有助于您区分每个表上的导入作业。
-
列分隔符
属性
COLUMN TERMINATED BY
定义 CSV 格式数据的列分隔符。 默认为\t
。 -
Kafka Topic 分区和偏移量
您可以指定属性
kafka_partitions
和kafka_offsets
以指定要消费消息的分区和偏移量。 例如,如果您希望导入作业消费来自 Topicordertest1
的 Kafka 分区"0,1,2,3,4"
的消息,所有消息都具有初始偏移量,则可以按如下方式指定属性:如果您希望导入作业消费来自 Kafka 分区"0,1,2,3,4"
的消息,并且您需要为每个分区指定单独的起始偏移量,则可以按如下方式配置:"kafka_partitions" ="0,1,2,3,4",
"kafka_offsets" = "OFFSET_BEGINNING, OFFSET_END, 1000, 2000, 3000"您还可以使用属性
property.kafka_default_offsets
设置所有分区的默认偏移量。"kafka_partitions" ="0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"有关详细信息,请参见CREATE ROUTINE LOAD。
-
数据映射和转换
要指定 CSV 格式数据与 StarRocks 表之间的映射和转换关系,您需要使用
COLUMNS
参数。数据映射
-
StarRocks 提取 CSV 格式数据中的列,并按顺序将它们映射到
COLUMNS
参数中声明的字段。 -
StarRocks 提取
COLUMNS
参数中声明的字段,并按名称将它们映射到 StarRocks 表的列。
数据转换
并且由于示例排除了 CSV 格式数据中的客户性别列,因此
COLUMNS
参数中的字段temp_gender
用作该字段的占位符。 其他字段直接映射到 StarRocks 表example_tbl1
的列。有关数据转换的更多信息,请参见加载时转换数据。
注意
如果 CSV 格式数据中的列的名称、数量和顺序与 StarRocks 表的列完全对应,则无需指定
COLUMNS
参数。 -
-
任务并发
当 Kafka Topic 分区很多并且 BE 节点足够时,您可以通过增加任务并发来加速加载。
要增加实际导入任务并发,您可以在创建例行导入作业时增加所需的导入任务并发
desired_concurrent_number
。 您还可以将 FE 的动态配置项max_routine_load_task_concurrent_num
(默认最大导入任务并发数)设置为更大的值。 有关max_routine_load_task_concurrent_num
的更多信息,请参见FE 配置项。实际任务并发由处于活动状态的 BE 节点的数量、预先指定的 Kafka Topic 分区的数量以及
desired_concurrent_number
和max_routine_load_task_concurrent_num
的值之间的最小值定义。在示例中,处于活动状态的 BE 节点的数量为
5
,预先指定的 Kafka Topic 分区的数量为5
,并且max_routine_load_task_concurrent_num
的值为5
。 要增加实际导入任务并发,您可以将desired_concurrent_number
从默认值3
增加到5
。有关属性的更多信息,请参见CREATE ROUTINE LOAD。
加载 JSON 格式的数据
本节介绍如何创建例行导入作业以消费 Kafka 集群中的 JSON 格式数据,并将数据加载到 StarRocks 中。
准备数据集
假设 Kafka 集群中 Topic ordertest2
中存在 JSON 格式的数据集。 数据集包括六个键:商品 ID、客户姓名、国籍、付款时间和价格。 此外,您希望将付款时间列转换为 DATE 类型,并将其加载到 StarRocks 表中的 pay_dt
列。
{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}
警告 一行中的每个 JSON 对象都必须位于一条 Kafka 消息中,否则会返回 JSON 解析错误。
创建表
根据 JSON 格式数据的键,在数据库 example_db
中创建表 example_tbl2
。
CREATE TABLE `example_tbl2` (
`commodity_id` varchar(26) NULL COMMENT "Commodity ID",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`country` varchar(26) NULL COMMENT "Country",
`pay_time` bigint(20) NULL COMMENT "Payment time",
`pay_dt` date NULL COMMENT "Payment date",
`price`double SUM NULL COMMENT "Price"
)
ENGINE=OLAP
AGGREGATE KEY(`commodity_id`,`customer_name`,`country`,`pay_time`,`pay_dt`)
DISTRIBUTED BY HASH(`commodity_id`);
注意
自 v2.5.7 起,StarRocks 可以在您创建表或添加分区时自动设置桶数 (BUCKETS)。 您不再需要手动设置桶数。 有关详细信息,请参见设置桶数。
提交例行导入作业
执行以下语句以提交名为 example_tbl2_ordertest2
的例行导入作业,以消费 Topic ordertest2
中的消息并将数据加载到表 example_tbl2
中。 导入任务消费来自 Topic 指定分区中初始偏移量的消息。
CREATE ROUTINE LOAD example_db.example_tbl2_ordertest2 ON example_tbl2
COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2",
"kafka_partitions" ="0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
提交导入作业后,您可以执行SHOW ROUTINE LOAD语句以检查导入作业的状态。
-
数据格式
您需要在子句
PROPERTIES
中指定"format" = "json"
以定义数据格式为 JSON。 -
数据映射和转换
要指定 JSON 格式数据与 StarRocks 表之间的映射和转换关系,您需要指定参数
COLUMNS
和属性jsonpaths
。COLUMNS
参数中指定的字段顺序必须与 JSON 格式数据的顺序匹配,并且字段名称必须与 StarRocks 表的名称匹配。 属性jsonpaths
用于从 JSON 数据中提取所需的字段。 然后,这些字段由属性COLUMNS
命名。由于该示例需要将付款时间字段转换为 DATE 数据类型,并将数据加载到 StarRocks 表中的
pay_dt
列,因此您需要使用 from_unixtime 函数。 其他字段直接映射到表example_tbl2
的字段。数据映射
-
StarRocks 提取 JSON 格式数据的
name
和code
键,并将它们映射到jsonpaths
属性中声明的键。 -
StarRocks 提取
jsonpaths
属性中声明的键,并按顺序将它们映射到COLUMNS
参数中声明的字段。 -
StarRocks 提取
COLUMNS
参数中声明的字段,并按名称将它们映射到 StarRocks 表的列。
数据转换:
-
由于该示例需要将键
pay_time
转换为 DATE 数据类型,并将数据加载到 StarRocks 表中的pay_dt
列,因此您需要在COLUMNS
参数中使用 from_unixtime 函数。 其他字段直接映射到表example_tbl2
的字段。 -
并且由于示例排除了 JSON 格式数据中的客户性别列,因此
COLUMNS
参数中的字段temp_gender
用作该字段的占位符。 其他字段直接映射到 StarRocks 表example_tbl1
的列。有关数据转换的更多信息,请参见加载时转换数据。
注意
如果 JSON 对象中键的名称和数量与 StarRocks 表中字段的名称和数量完全匹配,则无需指定
COLUMNS
参数。
-
加载 Avro 格式的数据
自 v3.0.1 起,StarRocks 支持使用 Routine Load 加载 Avro 数据。
准备数据集
Avro schema
-
创建以下 Avro schema 文件
avro_schema.avsc
{
"type": "record",
"name": "sensor_log",
"fields" : [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "checked", "type" : "boolean"},
{"name": "data", "type": "double"},
{"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}}
]
} -
在Schema Registry中注册 Avro schema。
Avro data
准备 Avro 数据并将其发送到 Kafka Topic topic_0
。
创建表
根据 Avro 数据的字段,在 StarRocks 集群的目标数据库 example_db
中创建表 sensor_log
。 表的列名必须与 Avro 数据中的字段名称匹配。 有关表列与 Avro 数据字段之间的数据类型映射,请参见[数据类型映射](#Data types mapping)。
CREATE TABLE example_db.sensor_log (
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`data` double NULL COMMENT "sensor data",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);
注意
自 v2.5.7 起,StarRocks 可以在您创建表或添加分区时自动设置桶数 (BUCKETS)。 您不再需要手动设置桶数。 有关详细信息,请参见设置桶数。
提交例行导入作业
执行以下语句以提交名为 sensor_log_load_job
的例行导入作业,以消费 Kafka Topic topic_0
中的 Avro 消息并将数据加载到数据库 sensor
中的表 sensor_log
中。 导入作业消费来自 Topic 指定分区中初始偏移量的消息。
CREATE ROUTINE LOAD example_db.sensor_log_load_job ON sensor_log
PROPERTIES
(
"format" = "avro"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_0",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
-
数据格式
您需要在子句
PROPERTIES
中指定"format = "avro"
以定义数据格式为 Avro。 -
Schema Registry
您需要配置
confluent.schema.registry.url
以指定注册 Avro schema 的 Schema Registry 的 URL。 StarRocks 使用此 URL 检索 Avro schema。 格式如下:confluent.schema.registry.url = http[s]://[<schema-registry-api-key>:<schema-registry-api-secret>@]<hostname|ip address>[:<port>]
-
数据映射和转换
要指定 Avro 格式数据与 StarRocks 表之间的映射和转换关系,您需要指定参数
COLUMNS
和属性jsonpaths
。COLUMNS
参数中指定的字段顺序必须与属性jsonpaths
中的字段顺序匹配,并且字段名称必须与 StarRocks 表的名称匹配。 属性jsonpaths
用于从 Avro 数据中提取所需的字段。 然后,这些字段由属性COLUMNS
命名。有关数据转换的更多信息,请参见加载时转换数据。
注意
如果 Avro 记录中字段的名称和数量与 StarRocks 表中列的名称和数量完全匹配,则无需指定
COLUMNS
参数。
提交导入作业后,您可以执行SHOW ROUTINE LOAD语句以检查导入作业的状态。
数据类型映射
您要加载的 Avro 数据字段与 StarRocks 表列之间的数据类型映射如下:
基本类型
Avro | StarRocks |
---|---|
nul | NULL |
boolean | BOOLEAN |
int | INT |
long | BIGINT |
float | FLOAT |
double | DOUBLE |
bytes | STRING |
string | STRING |
复杂类型
Avro | StarRocks |
---|---|
record | 将整个 RECORD 或其子字段作为 JSON 加载到 StarRocks 中。 |
enums | STRING |
arrays | ARRAY |
maps | JSON |
union(T, null) | NULLABLE(T) |
fixed | STRING |
限制
- 目前,StarRocks 不支持 schema evolution。
- 每条 Kafka 消息必须仅包含一条 Avro 数据记录。
检查加载作业和任务
检查加载作业
执行SHOW ROUTINE LOAD语句以检查导入作业 example_tbl2_ordertest2
的状态。 StarRocks 返回执行状态 State
、统计信息(包括消费的总行数和加载的总行数)Statistics
以及导入作业的进度 progress
。
如果导入作业的状态自动更改为 PAUSED,则可能是因为错误行数已超过阈值。 有关设置此阈值的详细说明,请参见CREATE ROUTINE LOAD。 您可以检查文件 ReasonOfStateChanged
和 ErrorLogUrls
以识别和排除问题。 修复问题后,您可以执行RESUME ROUTINE LOAD语句以恢复 PAUSED 导入作业。
如果导入作业的状态为 CANCELLED,则可能是因为导入作业遇到异常(例如表已被删除)。 您可以检查文件 ReasonOfStateChanged
和 ErrorLogUrls
以识别和排除问题。 但是,您无法恢复 CANCELLED 导入作业。
MySQL [example_db]> SHOW ROUTINE LOAD FOR example_tbl2_ordertest2 \G
*************************** 1. row ***************************
Id: 63013
Name: example_tbl2_ordertest2
CreateTime: 2022-08-10 17:09:00
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:example_db
TableName: example_tbl2
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 3
JobProperties: {"partitions":"*","partial_update":"false","columnToColumnExpr":"commodity_id,customer_name,country,pay_time,pay_dt=from_unixtime(`pay_time`, '%Y%m%d'),price","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","format":"json","json_root":"","strict_mode":"false","jsonpaths":"[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"3","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"ordertest2","currentKafkaPartitions":"0,1,2,3,4","brokerList":"<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>"}
CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING"}
Statistic: {"receivedBytes":230,"errorRows":0,"committedTaskNum":1,"loadedRows":2,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":2,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":522}
Progress: {"0":"1","1":"OFFSET_ZERO","2":"OFFSET_ZERO","3":"OFFSET_ZERO","4":"OFFSET_ZERO"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
注意
您无法检查已停止或尚未开始的导入作业。
检查加载任务
执行SHOW ROUTINE LOAD TASK语句以检查导入作业 example_tbl2_ordertest2
的导入任务,例如当前正在运行的任务数、消费的 Kafka Topic 分区和消费进度 DataSourceProperties
以及相应的 Coordinator BE 节点 BeId
。
MySQL [example_db]> SHOW ROUTINE LOAD TASK WHERE JobName = "example_tbl2_ordertest2" \G
*************************** 1. row ***************************
TaskId: 18c3a823-d73e-4a64-b9cb-b9eced026753
TxnId: -1
TxnStatus: UNKNOWN
JobId: 63013
CreateTime: 2022-08-10 17:09:05
LastScheduledTime: 2022-08-10 17:47:27
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: {"1":0,"4":0}
Message: there is no new data in kafka, wait for 20 seconds to schedule again
*************************** 2. row ***************************
TaskId: f76c97ac-26aa-4b41-8194-a8ba2063eb00
TxnId: -1
TxnStatus: UNKNOWN
JobId: 63013
CreateTime: 2022-08-10 17:09:05
LastScheduledTime: 2022-08-10 17:47:26
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: {"2":0}
Message: there is no new data in kafka, wait for 20 seconds to schedule again
*************************** 3. row ***************************
TaskId: 1a327a34-99f4-4f8d-8014-3cd38db99ec6
TxnId: -1
TxnStatus: UNKNOWN
JobId: 63013
CreateTime: 2022-08-10 17:09:26
LastScheduledTime: 2022-08-10 17:47:27
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: {"0":2,"3":0}
Message: there is no new data in kafka, wait for 20 seconds to schedule again
暂停导入作业
您可以执行PAUSE ROUTINE LOAD语句以暂停导入作业。 执行该语句后,导入作业的状态将为 PAUSED。 但是,它尚未停止。 您可以执行RESUME ROUTINE LOAD语句以恢复它。 您还可以使用SHOW ROUTINE LOAD语句检查其状态。
以下示例暂停了导入作业 example_tbl2_ordertest2
PAUSE ROUTINE LOAD FOR example_tbl2_ordertest2;
恢复导入作业
您可以执行RESUME ROUTINE LOAD语句以恢复暂停的导入作业。 导入作业的状态将暂时为 NEED_SCHEDULE(因为正在重新调度导入作业),然后变为 RUNNING。 您可以使用SHOW ROUTINE LOAD语句检查其状态。
以下示例恢复了暂停的导入作业 example_tbl2_ordertest2
RESUME ROUTINE LOAD FOR example_tbl2_ordertest2;
修改加载作业
在更改导入作业之前,您必须使用PAUSE ROUTINE LOAD语句暂停它。 然后,您可以执行ALTER ROUTINE LOAD。 更改后,您可以执行RESUME ROUTINE LOAD语句以恢复它,并使用SHOW ROUTINE LOAD语句检查其状态。
假设处于活动状态的 BE 节点的数量增加到 6
,并且要消费的 Kafka Topic 分区为 "0,1,2,3,4,5,6,7"
。 如果要增加实际导入任务并发,您可以执行以下语句以将所需的任务并发数 desired_concurrent_number
增加到 6
(大于或等于处于活动状态的 BE 节点的数量),并指定 Kafka Topic 分区和初始偏移量。
注意
由于实际任务并发由多个参数的最小值确定,因此您必须确保 FE 动态参数
max_routine_load_task_concurrent_num
的值大于或等于6
。
ALTER ROUTINE LOAD FOR example_tbl2_ordertest2
PROPERTIES
(
"desired_concurrent_number" = "6"
)
FROM kafka
(
"kafka_partitions" = "0,1,2,3,4,5,6,7",
"kafka_offsets" = "OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_END,OFFSET_END,OFFSET_END,OFFSET_END"
);
停止导入作业
您可以执行STOP ROUTINE LOAD语句以停止导入作业。 执行该语句后,导入作业的状态将为 STOPPED,并且您无法恢复已停止的导入作业。 您无法使用SHOW ROUTINE LOAD语句检查已停止的导入作业的状态。
以下示例停止了导入作业 example_tbl2_ordertest2
STOP ROUTINE LOAD FOR example_tbl2_ordertest2;