跳到主要内容
版本: 最新版本-3.5

使用 Routine Load 加载数据

提示

在此 快速入门 中尝试 Routine Load

本主题介绍如何创建 Routine Load 作业,将 Kafka 消息(事件)流式传输到 StarRocks 中,并帮助您熟悉 Routine Load 的一些基本概念。

要将消息流持续加载到 StarRocks 中,您可以将消息流存储在 Kafka Topic 中,并创建一个 Routine Load 作业来消费消息。 Routine Load 作业持久保存在 StarRocks 中,生成一系列加载任务来消费 Topic 中全部或部分分区中的消息,并将消息加载到 StarRocks 中。

Routine Load 作业支持精确一次交付语义,以保证加载到 StarRocks 中的数据既不会丢失也不会重复。

Routine Load 支持加载时的数据转换,并支持在数据加载期间通过 UPSERT 和 DELETE 操作进行数据更改。有关详细信息,请参见加载时转换数据通过加载更改数据

您只能以对这些 StarRocks 表具有 INSERT 权限的用户身份将数据加载到 StarRocks 表中。如果您没有 INSERT 权限,请按照 GRANT 中提供的说明将 INSERT 权限授予您用于连接到 StarRocks 集群的用户。语法为 GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}

支持的数据格式

Routine Load 现在支持从 Kafka 集群消费 CSV、JSON 和 Avro(自 v3.0.1 起支持)格式的数据。

注意

对于 CSV 数据,请注意以下几点

  • 您可以使用 UTF-8 字符串(例如逗号 (,)、制表符或管道 (|))作为文本分隔符,其长度不超过 50 字节。
  • 空值使用 \N 表示。例如,一个数据文件包含三列,并且该数据文件中的一条记录在第一列和第三列中包含数据,但在第二列中不包含数据。在这种情况下,您需要在第二列中使用 \N 来表示空值。这意味着记录必须编译为 a,\N,b 而不是 a,,ba,,b 表示记录的第二列包含一个空字符串。

基本概念

routine load

术语

  • 加载作业

    Routine Load 作业是一个长时间运行的作业。只要其状态为 RUNNING,加载作业就会持续生成一个或多个并发加载任务,这些任务会消费 Kafka 集群的 Topic 中的消息并将数据加载到 StarRocks 中。

  • 加载任务

    一个加载作业按照一定的规则被分成多个加载任务。加载任务是数据加载的基本单位。作为一个单独的事件,加载任务实现了基于Stream Load的加载机制。多个加载任务并发地消费来自 Topic 不同分区的消息,并将数据加载到 StarRocks 中。

工作流程

  1. 创建 Routine Load 作业。 要从 Kafka 加载数据,您需要通过运行 CREATE ROUTINE LOAD 语句来创建一个 Routine Load 作业。 FE 解析该语句,并根据您指定的属性创建作业。

  2. FE 将作业拆分为多个加载任务。

    FE 基于一定的规则将作业拆分为多个加载任务。每个加载任务都是一个独立的事务。拆分规则如下

    • FE 根据期望的并发数 desired_concurrent_number、Kafka Topic 中的分区数以及存活的 BE 节点数计算加载任务的实际并发数。
    • FE 基于计算出的实际并发数将作业拆分为加载任务,并将任务排列在任务队列中。

    每个 Kafka Topic 都由多个分区组成。 Topic 分区和加载任务之间的关系如下

    • 一个分区唯一分配给一个加载任务,并且来自该分区的所有消息都由该加载任务消费。
    • 一个加载任务可以消费来自一个或多个分区的消息。
    • 所有分区均匀地分布在加载任务中。
  3. 多个加载任务并发运行以消费来自多个 Kafka Topic 分区的消息,并将数据加载到 StarRocks 中

    1. FE 调度和提交加载任务:FE 及时调度队列中的加载任务,并将它们分配给选定的 Coordinator BE 节点。加载任务之间的间隔由配置项 max_batch_interval 定义。 FE 将加载任务均匀地分配给所有 BE 节点。有关 max_batch_interval 的更多信息,请参见 CREATE ROUTINE LOAD

    2. Coordinator BE 启动加载任务,消费分区中的消息,解析和过滤数据。加载任务持续到消费预定义数量的消息或达到预定义的时间限制为止。消息批处理大小和时间限制在 FE 配置 max_routine_load_batch_sizeroutine_load_task_consume_second 中定义。有关详细信息,请参见FE 配置。然后,Coordinator BE 将消息分发给 Executor BE。 Executor BE 将消息写入磁盘。

      注意

      StarRocks 支持通过包括 SASL_SSL、SASL_PLAINTEXT、SSL 和 PLAINTEXT 在内的安全协议访问 Kafka。本主题以通过 PLAINTEXT 连接到 Kafka 为例。如果您需要通过其他安全协议连接到 Kafka,请参见 CREATE ROUTINE LOAD

  4. FE 生成新的加载任务以持续加载数据。 在 Executor BE 将数据写入磁盘后,Coordinator BE 将加载任务的结果报告给 FE。根据结果,FE 然后生成新的加载任务以持续加载数据。或者,FE 重试失败的任务以确保加载到 StarRocks 中的数据既不会丢失也不会重复。

创建 Routine Load 作业

以下三个示例描述了如何在 Kafka 中消费 CSV 格式、JSON 格式和 Avro 格式的数据,并通过创建 Routine Load 作业将数据加载到 StarRocks 中。有关详细的语法和参数说明,请参见 CREATE ROUTINE LOAD

加载 CSV 格式数据

本节介绍如何创建一个 Routine Load 作业来消费 Kafka 集群中的 CSV 格式数据,并将数据加载到 StarRocks 中。

准备数据集

假设在 Kafka 集群的 Topic ordertest1 中有一个 CSV 格式的数据集。数据集中的每条消息都包含六个字段:订单 ID、付款日期、客户姓名、国籍、性别和价格。

2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina",Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924

创建表

根据 CSV 格式数据的字段,在数据库 example_db 中创建表 example_tbl1。以下示例创建一个包含 5 个字段的表,不包括 CSV 格式数据中的客户性别字段。

CREATE TABLE example_db.example_tbl1 ( 
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`price`double NULL COMMENT "Price"
)
ENGINE=OLAP
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(`order_id`);

注意

自 v2.5.7 起,StarRocks 可以在您创建表或添加分区时自动设置存储桶数 (BUCKETS)。您不再需要手动设置存储桶数。有关详细信息,请参见 设置存储桶数

提交 Routine Load 作业

执行以下语句以提交一个名为 example_tbl1_ordertest1 的 Routine Load 作业,以消费 Topic ordertest1 中的消息并将数据加载到表 example_tbl1 中。加载任务消费 Topic 指定分区中初始偏移量的消息。

CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" = "0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

提交加载作业后,您可以执行 SHOW ROUTINE LOAD 语句以检查加载作业的状态。

  • 加载作业名称

    一个表上可能有多个加载作业。因此,我们建议您使用相应的 Kafka Topic 和提交加载作业的时间来命名加载作业。这有助于您区分每个表上的加载作业。

  • 列分隔符

    属性 COLUMN TERMINATED BY 定义 CSV 格式数据的列分隔符。默认为 \t

  • Kafka Topic 分区和偏移量

    您可以指定属性 kafka_partitionskafka_offsets 来指定分区和偏移量以消费消息。例如,如果您希望加载作业从 Topic ordertest1 的 Kafka 分区 "0,1,2,3,4" 中消费所有具有初始偏移量的消息,您可以按如下方式指定属性:如果您希望加载作业从 Kafka 分区 "0,1,2,3,4" 中消费消息,并且您需要为每个分区指定一个单独的起始偏移量,您可以按如下方式进行配置

    "kafka_partitions" ="0,1,2,3,4",
    "kafka_offsets" = "OFFSET_BEGINNING, OFFSET_END, 1000, 2000, 3000"

    您还可以使用属性 property.kafka_default_offsets 设置所有分区的默认偏移量。

    "kafka_partitions" ="0,1,2,3,4",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"

    有关详细信息,请参见 CREATE ROUTINE LOAD

  • 数据映射和转换

    要指定 CSV 格式数据与 StarRocks 表之间的映射和转换关系,您需要使用 COLUMNS 参数。

    数据映射

    • StarRocks 提取 CSV 格式数据中的列,并按顺序将它们映射到 COLUMNS 参数中声明的字段上。

    • StarRocks 提取 COLUMNS 参数中声明的字段,并按名称将它们映射到 StarRocks 表的列上。

    数据转换

    并且由于该示例从 CSV 格式数据中排除了客户性别列,因此 COLUMNS 参数中的字段 temp_gender 用作此字段的占位符。其他字段直接映射到 StarRocks 表 example_tbl1 的列。

    有关数据转换的更多信息,请参见加载时转换数据

    注意

    如果 CSV 格式数据中列的名称、数量和顺序完全对应于 StarRocks 表中的列,则无需指定 COLUMNS 参数。

  • 任务并发

    当 Kafka Topic 分区很多且 BE 节点足够时,您可以通过增加任务并发来加速加载。

    要增加实际加载任务并发,您可以在创建 Routine Load 作业时增加期望的加载任务并发 desired_concurrent_number。您还可以将 FE 的动态配置项 max_routine_load_task_concurrent_num(默认最大加载任务并发)设置为较大的值。有关 max_routine_load_task_concurrent_num 的更多信息,请参见 FE 配置项

    实际任务并发由存活的 BE 节点数、预指定的 Kafka Topic 分区数以及 desired_concurrent_numbermax_routine_load_task_concurrent_num 的值之间的最小值定义。

    在该示例中,存活的 BE 节点数为 5,预指定的 Kafka Topic 分区数为 5,并且 max_routine_load_task_concurrent_num 的值为 5。要增加实际加载任务并发,您可以将 desired_concurrent_number 从默认值 3 增加到 5

    有关属性的更多信息,请参见 CREATE ROUTINE LOAD

加载 JSON 格式数据

本节介绍如何创建一个 Routine Load 作业来消费 Kafka 集群中的 JSON 格式数据,并将数据加载到 StarRocks 中。

准备数据集

假设在 Kafka 集群的 Topic ordertest2 中有一个 JSON 格式的数据集。该数据集包含六个键:商品 ID、客户姓名、国籍、付款时间和价格。此外,您希望将付款时间列转换为 DATE 类型,并将其加载到 StarRocks 表中的 pay_dt 列中。

{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}

注意 一行中的每个 JSON 对象必须位于一条 Kafka 消息中,否则会返回 JSON 解析错误。

创建表

根据 JSON 格式数据的键,在数据库 example_db 中创建表 example_tbl2

CREATE TABLE `example_tbl2` ( 
`commodity_id` varchar(26) NULL COMMENT "Commodity ID",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`country` varchar(26) NULL COMMENT "Country",
`pay_time` bigint(20) NULL COMMENT "Payment time",
`pay_dt` date NULL COMMENT "Payment date",
`price`double SUM NULL COMMENT "Price"
)
ENGINE=OLAP
AGGREGATE KEY(`commodity_id`,`customer_name`,`country`,`pay_time`,`pay_dt`)
DISTRIBUTED BY HASH(`commodity_id`);

注意

自 v2.5.7 起,StarRocks 可以在您创建表或添加分区时自动设置存储桶数 (BUCKETS)。您不再需要手动设置存储桶数。有关详细信息,请参见 设置存储桶数

提交 Routine Load 作业

执行以下语句以提交一个名为 example_tbl2_ordertest2 的 Routine Load 作业,以消费 Topic ordertest2 中的消息并将数据加载到表 example_tbl2 中。加载任务消费 Topic 指定分区中初始偏移量的消息。

CREATE ROUTINE LOAD example_db.example_tbl2_ordertest2 ON example_tbl2
COLUMNS(commodity_id, customer_name, country, pay_time, price, pay_dt=from_unixtime(pay_time, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2",
"kafka_partitions" ="0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

提交加载作业后,您可以执行 SHOW ROUTINE LOAD 语句以检查加载作业的状态。

  • 数据格式

    您需要在 PROPERTIES 子句中指定 "format" = "json" 以定义数据格式为 JSON。

  • 数据映射和转换

    要指定 JSON 格式数据与 StarRocks 表之间的映射和转换关系,您需要指定参数 COLUMNS 和属性 jsonpathsCOLUMNS 参数中指定的字段顺序必须与 JSON 格式数据的顺序匹配,并且字段名称必须与 StarRocks 表的名称匹配。属性 jsonpaths 用于从 JSON 数据中提取所需的字段。然后,这些字段由属性 COLUMNS 命名。

    因为该示例需要将付款时间字段转换为 DATE 数据类型,并将数据加载到 StarRocks 表中的 pay_dt 列中,所以您需要使用 from_unixtime 函数。其他字段直接映射到表 example_tbl2 的字段。

    数据映射

    • StarRocks 提取 JSON 格式数据的 namecode 键,并将它们映射到 jsonpaths 属性中声明的键上。

    • StarRocks 提取 jsonpaths 属性中声明的键,并按顺序将它们映射到 COLUMNS 参数中声明的字段上。

    • StarRocks 提取 COLUMNS 参数中声明的字段,并按名称将它们映射到 StarRocks 表的列上。

    数据转换:

    • 因为该示例需要将键 pay_time 转换为 DATE 数据类型,并将数据加载到 StarRocks 表中的 pay_dt 列中,所以您需要在 COLUMNS 参数中使用 from_unixtime 函数。其他字段直接映射到表 example_tbl2 的字段。

    • 并且由于该示例从 JSON 格式数据中排除了客户性别列,因此 COLUMNS 参数中的字段 temp_gender 用作此字段的占位符。其他字段直接映射到 StarRocks 表 example_tbl1 的列。

      有关数据转换的更多信息,请参见加载时转换数据

      注意

      如果 JSON 对象中键的名称和数量与 StarRocks 表中字段的名称和数量完全匹配,则无需指定 COLUMNS 参数。

加载 Avro 格式数据

自 v3.0.1 起,StarRocks 支持使用 Routine Load 加载 Avro 数据。

准备数据集

Avro schema
  1. 创建以下 Avro schema 文件 avro_schema.avsc

    {
    "type": "record",
    "name": "sensor_log",
    "fields" : [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "checked", "type" : "boolean"},
    {"name": "data", "type": "double"},
    {"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}}
    ]
    }
  2. Schema Registry中注册 Avro schema。

Avro 数据

准备 Avro 数据并将其发送到 Kafka Topic topic_0

创建表

根据 Avro 数据的字段,在 StarRocks 集群的目标数据库 example_db 中创建一个表 sensor_log。该表的列名必须与 Avro 数据中的字段名匹配。有关表列和 Avro 数据字段之间的数据类型映射,请参见[数据类型映射](#数据类型映射)。

CREATE TABLE example_db.sensor_log ( 
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`data` double NULL COMMENT "sensor data",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);

注意

自 v2.5.7 起,StarRocks 可以在您创建表或添加分区时自动设置存储桶数 (BUCKETS)。您不再需要手动设置存储桶数。有关详细信息,请参见 设置存储桶数

提交 Routine Load 作业

执行以下语句以提交一个名为 sensor_log_load_job 的 Routine Load 作业,以消费 Kafka Topic topic_0 中的 Avro 消息并将数据加载到数据库 sensor 中的表 sensor_log 中。加载作业消费 Topic 指定分区中初始偏移量的消息。

CREATE ROUTINE LOAD example_db.sensor_log_load_job ON sensor_log  
PROPERTIES
(
"format" = "avro"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_0",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
  • 数据格式

    您需要在 PROPERTIES 子句中指定 "format = "avro" 以定义数据格式为 Avro。

  • Schema Registry

    您需要配置 confluent.schema.registry.url 以指定注册 Avro schema 的 Schema Registry 的 URL。 StarRocks 使用此 URL 检索 Avro schema。格式如下

    confluent.schema.registry.url = http[s]://[<schema-registry-api-key>:<schema-registry-api-secret>@]<hostname|ip address>[:<port>]
  • 数据映射和转换

    要指定 Avro 格式数据与 StarRocks 表之间的映射和转换关系,您需要指定参数 COLUMNS 和属性 jsonpathsCOLUMNS 参数中指定的字段顺序必须与属性 jsonpaths 中的字段顺序匹配,并且字段名称必须与 StarRocks 表的名称匹配。属性 jsonpaths 用于从 Avro 数据中提取所需的字段。然后,这些字段由属性 COLUMNS 命名。

    有关数据转换的更多信息,请参见加载时转换数据

    注意

    如果 Avro 记录中字段的名称和数量与 StarRocks 表中列的名称和数量完全匹配,则无需指定 COLUMNS 参数。

提交加载作业后,您可以执行 SHOW ROUTINE LOAD 语句以检查加载作业的状态。

数据类型映射

您要加载的 Avro 数据字段与 StarRocks 表列之间的数据类型映射如下

原始类型
AvroStarRocks
nulNULL
booleanBOOLEAN
intINT
longBIGINT
floatFLOAT
doubleDOUBLE
bytesSTRING
stringSTRING
复杂类型
AvroStarRocks
record将整个 RECORD 或其子字段作为 JSON 加载到 StarRocks 中。
enumsSTRING
arraysARRAY
mapsJSON
union(T, null)NULLABLE(T)
fixedSTRING

限制

  • 目前,StarRocks 不支持 schema evolution。
  • 每条 Kafka 消息必须仅包含一条 Avro 数据记录。

检查加载作业和任务

检查加载作业

执行 SHOW ROUTINE LOAD 语句以检查加载作业 example_tbl2_ordertest2 的状态。 StarRocks 返回执行状态 State、统计信息(包括消费的总行数和加载的总行数)Statistics 以及加载作业的进度 progress

如果加载作业的状态自动更改为 PAUSED,则可能是因为错误行数已超过阈值。有关设置此阈值的详细说明,请参见 CREATE ROUTINE LOAD。您可以检查文件 ReasonOfStateChangedErrorLogUrls 以识别和排除问题。修复问题后,您可以执行 RESUME ROUTINE LOAD 语句以恢复 PAUSED 加载作业。

如果加载作业的状态为 CANCELLED,则可能是因为加载作业遇到异常(例如,表已被删除)。您可以检查文件 ReasonOfStateChangedErrorLogUrls 以识别和排除问题。但是,您无法恢复 CANCELLED 加载作业。

MySQL [example_db]> SHOW ROUTINE LOAD FOR example_tbl2_ordertest2 \G
*************************** 1. row ***************************
Id: 63013
Name: example_tbl2_ordertest2
CreateTime: 2022-08-10 17:09:00
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:example_db
TableName: example_tbl2
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 3
JobProperties: {"partitions":"*","partial_update":"false","columnToColumnExpr":"commodity_id,customer_name,country,pay_time,pay_dt=from_unixtime(`pay_time`, '%Y%m%d'),price","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","format":"json","json_root":"","strict_mode":"false","jsonpaths":"[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"3","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"ordertest2","currentKafkaPartitions":"0,1,2,3,4","brokerList":"<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>"}
CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING"}
Statistic: {"receivedBytes":230,"errorRows":0,"committedTaskNum":1,"loadedRows":2,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":2,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":522}
Progress: {"0":"1","1":"OFFSET_ZERO","2":"OFFSET_ZERO","3":"OFFSET_ZERO","4":"OFFSET_ZERO"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:

注意

您无法检查已停止或尚未开始的加载作业。

检查加载任务

执行 SHOW ROUTINE LOAD TASK 语句以检查加载作业 example_tbl2_ordertest2 的加载任务,例如当前有多少任务正在运行、消耗的 Kafka Topic 分区和消费进度 DataSourceProperties 以及相应的 Coordinator BE 节点 BeId

MySQL [example_db]> SHOW ROUTINE LOAD TASK WHERE JobName = "example_tbl2_ordertest2" \G
*************************** 1. row ***************************
TaskId: 18c3a823-d73e-4a64-b9cb-b9eced026753
TxnId: -1
TxnStatus: UNKNOWN
JobId: 63013
CreateTime: 2022-08-10 17:09:05
LastScheduledTime: 2022-08-10 17:47:27
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: {"1":0,"4":0}
Message: there is no new data in kafka, wait for 20 seconds to schedule again
*************************** 2. row ***************************
TaskId: f76c97ac-26aa-4b41-8194-a8ba2063eb00
TxnId: -1
TxnStatus: UNKNOWN
JobId: 63013
CreateTime: 2022-08-10 17:09:05
LastScheduledTime: 2022-08-10 17:47:26
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: {"2":0}
Message: there is no new data in kafka, wait for 20 seconds to schedule again
*************************** 3. row ***************************
TaskId: 1a327a34-99f4-4f8d-8014-3cd38db99ec6
TxnId: -1
TxnStatus: UNKNOWN
JobId: 63013
CreateTime: 2022-08-10 17:09:26
LastScheduledTime: 2022-08-10 17:47:27
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: {"0":2,"3":0}
Message: there is no new data in kafka, wait for 20 seconds to schedule again

暂停加载作业

您可以执行 PAUSE ROUTINE LOAD 语句以暂停加载作业。执行语句后,加载作业的状态将为 PAUSED。但是,它尚未停止。您可以执行 RESUME ROUTINE LOAD 语句以恢复它。您还可以使用 SHOW ROUTINE LOAD 语句检查其状态。

以下示例暂停加载作业 example_tbl2_ordertest2

PAUSE ROUTINE LOAD FOR example_tbl2_ordertest2;

恢复加载作业

您可以执行 RESUME ROUTINE LOAD 语句以恢复暂停的加载作业。加载作业的状态将暂时变为 NEED_SCHEDULE(因为正在重新调度加载作业),然后变为 RUNNING。您可以使用 SHOW ROUTINE LOAD 语句检查其状态。

以下示例恢复暂停的加载作业 example_tbl2_ordertest2

RESUME ROUTINE LOAD FOR example_tbl2_ordertest2;

修改加载作业

在更改加载作业之前,您必须使用 PAUSE ROUTINE LOAD 语句暂停它。然后,您可以执行 ALTER ROUTINE LOAD。更改后,您可以执行 RESUME ROUTINE LOAD 语句以恢复它,并使用 SHOW ROUTINE LOAD 语句检查其状态。

假设存活的 BE 节点数增加到 6,并且要消耗的 Kafka Topic 分区为 "0,1,2,3,4,5,6,7"。如果您想增加实际加载任务并发,您可以执行以下语句将期望的任务并发数 desired_concurrent_number 增加到 6(大于或等于存活的 BE 节点数),并指定 Kafka Topic 分区和初始偏移量。

注意

由于实际任务并发由多个参数的最小值决定,因此您必须确保 FE 动态参数 max_routine_load_task_concurrent_num 的值大于或等于 6

ALTER ROUTINE LOAD FOR example_tbl2_ordertest2
PROPERTIES
(
"desired_concurrent_number" = "6"
)
FROM kafka
(
"kafka_partitions" = "0,1,2,3,4,5,6,7",
"kafka_offsets" = "OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_END,OFFSET_END,OFFSET_END,OFFSET_END"
);

停止加载作业

您可以执行 STOP ROUTINE LOAD 语句以停止加载作业。执行语句后,加载作业的状态将为 STOPPED,并且您无法恢复停止的加载作业。您无法使用 SHOW ROUTINE LOAD 语句检查已停止的加载作业的状态。

以下示例停止加载作业 example_tbl2_ordertest2

STOP ROUTINE LOAD FOR example_tbl2_ordertest2;