从 Apache® Pulsar™ 持续加载数据
自 StarRocks 2.5 版本起,Routine Load 支持从 Apache® Pulsar™ 持续加载数据。Pulsar 是一个分布式的开源 pub-sub 消息和流式平台,采用存储计算分离架构。通过 Routine Load 从 Pulsar 加载数据类似于从 Apache Kafka 加载数据。本文以 CSV 格式的数据为例,介绍如何通过 Routine Load 从 Apache Pulsar 加载数据。
支持的数据文件格式
Routine Load 支持从 Pulsar 集群消费 CSV 和 JSON 格式的数据。
注意
对于 CSV 格式的数据,StarRocks 支持 UTF-8 编码的字符串(长度小于 50 字节)作为列分隔符。常用的列分隔符包括逗号 (,)、制表符和竖线 (|)。
Pulsar 相关概念
Pulsar 中的 Topic 是一个命名通道,用于将消息从生产者传输到消费者。Pulsar 中的 Topic 分为分区 Topic 和非分区 Topic。
- 分区 Topic是一种特殊类型的 Topic,由多个 Broker 处理,从而实现更高的吞吐量。分区 Topic 实际上实现为 N 个内部 Topic,其中 N 是分区的数量。
- 非分区 Topic是一种普通的 Topic,仅由单个 Broker 提供服务,这限制了 Topic 的最大吞吐量。
消息的 ID 由 BookKeeper 实例在消息被持久存储后立即分配。消息 ID 指示消息在 Ledger 中的特定位置,并且在 Pulsar 集群中是唯一的。
Pulsar 支持消费者通过 consumer.seek(messageId) 指定初始位置。但是与 Kafka 消费者偏移量(一个长整数值)相比,消息 ID 由四个部分组成:ledgerId:entryID:partition-index:batch-index
。
因此,您无法直接从消息中获取消息 ID。因此,目前,Routine Load 不支持在从 Pulsar 加载数据时指定初始位置,而仅支持从分区的开始或结束位置消费数据。
Subscription 是一个命名配置规则,用于确定如何将消息传递给消费者。Pulsar 还支持消费者同时订阅多个 Topic。一个 Topic 可以有多个 Subscription。
Subscription 的类型在消费者连接到它时定义,并且可以通过使用不同的配置重新启动所有消费者来更改类型。Pulsar 中有四种 Subscription 类型可用
exclusive
(默认):仅允许单个消费者附加到 Subscription。只允许一个客户消费消息。shared
:多个消费者可以附加到同一个 Subscription。消息以循环方式在消费者之间分发,并且任何给定的消息仅传递给一个消费者。failover
:多个消费者可以附加到同一个 Subscription。为主 Topic 或分区 Topic 的每个分区选择一个 Master 消费者并接收消息。当 Master 消费者断开连接时,所有(未确认和后续)消息都会传递给队列中的下一个消费者。key_shared
:多个消费者可以附加到同一个 Subscription。消息在消费者之间分发,并且具有相同 Key 或相同排序 Key 的消息仅传递给一个消费者。
注意
目前 Routine Load 使用 exclusive 类型。
创建 Routine Load 作业
以下示例描述了如何消费 Pulsar 中 CSV 格式的消息,并通过创建 Routine Load 作业将数据加载到 StarRocks 中。有关详细说明和参考,请参见 CREATE ROUTINE LOAD。
CREATE ROUTINE LOAD load_test.routine_wiki_edit_1 ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
ROWS TERMINATED BY "\n",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
WHERE event_time > "2022-01-01 00:00:00",
PROPERTIES
(
"desired_concurrent_number" = "1",
"max_batch_interval" = "15000",
"max_error_number" = "1000"
)
FROM PULSAR
(
"pulsar_service_url" = "pulsar://:6650",
"pulsar_topic" = "persistent://tenant/namespace/topic-name",
"pulsar_subscription" = "load-test",
"pulsar_partitions" = "load-partition-0,load-partition-1",
"pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_LATEST",
"property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD5Y"
);
创建 Routine Load 以从 Pulsar 消费数据时,除 data_source_properties
之外的大多数输入参数与从 Kafka 消费数据相同。有关除 data_source_properties data_source_properties
之外的参数的描述,请参见 CREATE ROUTINE LOAD。
与 data_source_properties
相关的参数及其描述如下
参数 | 必需 | 描述 |
---|---|---|
pulsar_service_url | 是 | 用于连接到 Pulsar 集群的 URL。格式:"pulsar://ip:port" 或 "pulsar://service:port" 。示例:"pulsar_service_url" = "pulsar://``localhost:6650``" |
pulsar_topic | 是 | 订阅的 Topic。示例:"pulsar_topic" = "persistent://tenant/namespace/topic-name" |
pulsar_subscription | 是 | 为 Topic 配置的 Subscription。示例:"pulsar_subscription" = "my_subscription" |
pulsar_partitions, pulsar_initial_positions | 否 | pulsar_partitions :Topic 中订阅的分区。pulsar_initial_positions :pulsar_partitions 指定的分区的初始位置。初始位置必须与 pulsar_partitions 中的分区相对应。有效值:POSITION_EARLIEST (默认值):订阅从分区中最早可用的消息开始。POSITION_LATEST :订阅从分区中最新的可用消息开始。注意:如果 未指定 pulsar_partitions ,则订阅 Topic 的所有分区。如果同时指定了 pulsar_partitions 和 property.pulsar_default_initial_position ,则 pulsar_partitions 值将覆盖 property.pulsar_default_initial_position 值。如果既未指定 pulsar_partitions 也未指定 property.pulsar_default_initial_position ,则订阅从分区中最新的可用消息开始。示例:"pulsar_partitions" = "my-partition-0,my-partition-1,my-partition-2,my-partition-3", "pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_EARLIEST,POSITION_LATEST,POSITION_LATEST" |
Routine Load 支持以下 Pulsar 的自定义参数。
参数 | 必需 | 描述 |
---|---|---|
property.pulsar_default_initial_position | 否 | 订阅 Topic 的分区时的默认初始位置。该参数在未指定 pulsar_initial_positions 时生效。其有效值与 pulsar_initial_positions 的有效值相同。示例:"``property.pulsar_default_initial_position" = "POSITION_EARLIEST" |
property.auth.token | 否 | 如果 Pulsar 启用了使用安全令牌验证客户端,则您需要令牌字符串来验证您的身份。示例:"p``roperty.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD" |
检查加载作业和任务
检查加载作业
执行 SHOW ROUTINE LOAD 语句以检查加载作业 routine_wiki_edit_1
的状态。StarRocks 返回执行状态 State
、统计信息(包括消费的总行数和加载的总行数)Statistics
以及加载作业的进度 progress
。
当您检查从 Pulsar 消费数据的 Routine Load 作业时,除 progress
之外的大多数返回参数与从 Kafka 消费数据相同。progress
指的是积压,即分区中未确认的消息数。
MySQL [load_test] > SHOW ROUTINE LOAD for routine_wiki_edit_1 \G
*************************** 1. row ***************************
Id: 10142
Name: routine_wiki_edit_1
CreateTime: 2022-06-29 14:52:55
PauseTime: 2022-06-29 17:33:53
EndTime: NULL
DbName: default_cluster:test_pulsar
TableName: test1
State: PAUSED
DataSourceType: PULSAR
CurrentTaskNum: 0
JobProperties: {"partitions":"*","rowDelimiter":"'\n'","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","json_root":"","strict_mode":"false","jsonpaths":"","desireTaskConcurrentNum":"3","maxErrorNum":"10","strip_outer_array":"false","currentTaskConcurrentNum":"0","maxBatchRows":"200000"}
DataSourceProperties: {"serviceUrl":"pulsar://:6650","currentPulsarPartitions":"my-partition-0,my-partition-1","topic":"persistent://tenant/namespace/topic-name","subscription":"load-test"}
CustomProperties: {"auth.token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD"}
Statistic: {"receivedBytes":5480943882,"errorRows":0,"committedTaskNum":696,"loadedRows":66243440,"loadRowsRate":29000,"abortedTaskNum":0,"totalRows":66243440,"unselectedRows":0,"receivedBytesRate":2400000,"taskExecuteTimeMs":2283166}
Progress: {"my-partition-0(backlog): 100","my-partition-1(backlog): 0"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)
检查加载任务
执行 SHOW ROUTINE LOAD TASK 语句以检查加载作业 routine_wiki_edit_1
的加载任务,例如正在运行的任务数、消费的 Kafka Topic 分区和消费进度 DataSourceProperties
以及相应的 Coordinator BE 节点 BeId
。
MySQL [example_db]> SHOW ROUTINE LOAD TASK WHERE JobName = "routine_wiki_edit_1" \G
修改加载作业
在修改加载作业之前,必须使用 PAUSE ROUTINE LOAD 语句暂停它。然后,您可以执行 ALTER ROUTINE LOAD。修改后,您可以执行 RESUME ROUTINE LOAD 语句以恢复它,并使用 SHOW ROUTINE LOAD 语句检查其状态。
当 Routine Load 用于从 Pulsar 消费数据时,除 data_source_properties
之外的大多数返回参数与从 Kafka 消费数据相同。
请注意以下几点:
- 在
data_source_properties
相关参数中,目前仅支持修改pulsar_partitions
、pulsar_initial_positions
和自定义 Pulsar 参数property.pulsar_default_initial_position
和property.auth.token
。参数pulsar_service_url
、pulsar_topic
和pulsar_subscription
无法修改。 - 如果需要修改要消费的分区和匹配的初始位置,则需要确保在创建 Routine Load 作业时使用
pulsar_partitions
指定分区,并且只能修改指定分区的初始位置pulsar_initial_positions
。 - 如果在创建 Routine Load 作业时仅指定 Topic
pulsar_topic
,但不指定分区pulsar_partitions
,则可以通过pulsar_default_initial_position
修改 Topic 下所有分区的起始位置。