跳到主要内容
版本: 最新版本-3.5

从 Apache® Pulsar™ 持续加载数据

自 StarRocks 2.5 版本起,Routine Load 支持从 Apache® Pulsar™ 持续加载数据。Pulsar 是一个分布式的开源 pub-sub 消息和流式平台,采用存储计算分离架构。通过 Routine Load 从 Pulsar 加载数据类似于从 Apache Kafka 加载数据。本文以 CSV 格式的数据为例,介绍如何通过 Routine Load 从 Apache Pulsar 加载数据。

支持的数据文件格式

Routine Load 支持从 Pulsar 集群消费 CSV 和 JSON 格式的数据。

注意

对于 CSV 格式的数据,StarRocks 支持 UTF-8 编码的字符串(长度小于 50 字节)作为列分隔符。常用的列分隔符包括逗号 (,)、制表符和竖线 (|)。

Topic

Pulsar 中的 Topic 是一个命名通道,用于将消息从生产者传输到消费者。Pulsar 中的 Topic 分为分区 Topic 和非分区 Topic。

  • 分区 Topic是一种特殊类型的 Topic,由多个 Broker 处理,从而实现更高的吞吐量。分区 Topic 实际上实现为 N 个内部 Topic,其中 N 是分区的数量。
  • 非分区 Topic是一种普通的 Topic,仅由单个 Broker 提供服务,这限制了 Topic 的最大吞吐量。

消息 ID

消息的 ID 由 BookKeeper 实例在消息被持久存储后立即分配。消息 ID 指示消息在 Ledger 中的特定位置,并且在 Pulsar 集群中是唯一的。

Pulsar 支持消费者通过 consumer.seek(messageId) 指定初始位置。但是与 Kafka 消费者偏移量(一个长整数值)相比,消息 ID 由四个部分组成:ledgerId:entryID:partition-index:batch-index

因此,您无法直接从消息中获取消息 ID。因此,目前,Routine Load 不支持在从 Pulsar 加载数据时指定初始位置,而仅支持从分区的开始或结束位置消费数据。

Subscription

Subscription 是一个命名配置规则,用于确定如何将消息传递给消费者。Pulsar 还支持消费者同时订阅多个 Topic。一个 Topic 可以有多个 Subscription。

Subscription 的类型在消费者连接到它时定义,并且可以通过使用不同的配置重新启动所有消费者来更改类型。Pulsar 中有四种 Subscription 类型可用

  • exclusive (默认)仅允许单个消费者附加到 Subscription。只允许一个客户消费消息。
  • shared:多个消费者可以附加到同一个 Subscription。消息以循环方式在消费者之间分发,并且任何给定的消息仅传递给一个消费者。
  • failover:多个消费者可以附加到同一个 Subscription。为主 Topic 或分区 Topic 的每个分区选择一个 Master 消费者并接收消息。当 Master 消费者断开连接时,所有(未确认和后续)消息都会传递给队列中的下一个消费者。
  • key_shared:多个消费者可以附加到同一个 Subscription。消息在消费者之间分发,并且具有相同 Key 或相同排序 Key 的消息仅传递给一个消费者。

注意

目前 Routine Load 使用 exclusive 类型。

创建 Routine Load 作业

以下示例描述了如何消费 Pulsar 中 CSV 格式的消息,并通过创建 Routine Load 作业将数据加载到 StarRocks 中。有关详细说明和参考,请参见 CREATE ROUTINE LOAD

CREATE ROUTINE LOAD load_test.routine_wiki_edit_1 ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
ROWS TERMINATED BY "\n",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
WHERE event_time > "2022-01-01 00:00:00",
PROPERTIES
(
"desired_concurrent_number" = "1",
"max_batch_interval" = "15000",
"max_error_number" = "1000"
)
FROM PULSAR
(
"pulsar_service_url" = "pulsar://:6650",
"pulsar_topic" = "persistent://tenant/namespace/topic-name",
"pulsar_subscription" = "load-test",
"pulsar_partitions" = "load-partition-0,load-partition-1",
"pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_LATEST",
"property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD5Y"
);

创建 Routine Load 以从 Pulsar 消费数据时,除 data_source_properties 之外的大多数输入参数与从 Kafka 消费数据相同。有关除 data_source_properties data_source_properties 之外的参数的描述,请参见 CREATE ROUTINE LOAD

data_source_properties 相关的参数及其描述如下

参数必需描述
pulsar_service_url用于连接到 Pulsar 集群的 URL。格式:"pulsar://ip:port""pulsar://service:port"。示例:"pulsar_service_url" = "pulsar://``localhost:6650``"
pulsar_topic订阅的 Topic。示例:"pulsar_topic" = "persistent://tenant/namespace/topic-name"
pulsar_subscription为 Topic 配置的 Subscription。示例:"pulsar_subscription" = "my_subscription"
pulsar_partitions, pulsar_initial_positionspulsar_partitions:Topic 中订阅的分区。pulsar_initial_positionspulsar_partitions 指定的分区的初始位置。初始位置必须与 pulsar_partitions 中的分区相对应。有效值:POSITION_EARLIEST(默认值):订阅从分区中最早可用的消息开始。POSITION_LATEST:订阅从分区中最新的可用消息开始。注意:如果 未指定 pulsar_partitions,则订阅 Topic 的所有分区。如果同时指定了 pulsar_partitionsproperty.pulsar_default_initial_position,则 pulsar_partitions 值将覆盖 property.pulsar_default_initial_position 值。如果既未指定 pulsar_partitions 也未指定 property.pulsar_default_initial_position,则订阅从分区中最新的可用消息开始。示例:"pulsar_partitions" = "my-partition-0,my-partition-1,my-partition-2,my-partition-3", "pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_EARLIEST,POSITION_LATEST,POSITION_LATEST"

Routine Load 支持以下 Pulsar 的自定义参数。

参数必需描述
property.pulsar_default_initial_position订阅 Topic 的分区时的默认初始位置。该参数在未指定 pulsar_initial_positions 时生效。其有效值与 pulsar_initial_positions 的有效值相同。示例:"``property.pulsar_default_initial_position" = "POSITION_EARLIEST"
property.auth.token如果 Pulsar 启用了使用安全令牌验证客户端,则您需要令牌字符串来验证您的身份。示例:"p``roperty.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD"

检查加载作业和任务

检查加载作业

执行 SHOW ROUTINE LOAD 语句以检查加载作业 routine_wiki_edit_1 的状态。StarRocks 返回执行状态 State、统计信息(包括消费的总行数和加载的总行数)Statistics 以及加载作业的进度 progress

当您检查从 Pulsar 消费数据的 Routine Load 作业时,除 progress 之外的大多数返回参数与从 Kafka 消费数据相同。progress 指的是积压,即分区中未确认的消息数。

MySQL [load_test] > SHOW ROUTINE LOAD for routine_wiki_edit_1 \G
*************************** 1. row ***************************
Id: 10142
Name: routine_wiki_edit_1
CreateTime: 2022-06-29 14:52:55
PauseTime: 2022-06-29 17:33:53
EndTime: NULL
DbName: default_cluster:test_pulsar
TableName: test1
State: PAUSED
DataSourceType: PULSAR
CurrentTaskNum: 0
JobProperties: {"partitions":"*","rowDelimiter":"'\n'","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","json_root":"","strict_mode":"false","jsonpaths":"","desireTaskConcurrentNum":"3","maxErrorNum":"10","strip_outer_array":"false","currentTaskConcurrentNum":"0","maxBatchRows":"200000"}
DataSourceProperties: {"serviceUrl":"pulsar://:6650","currentPulsarPartitions":"my-partition-0,my-partition-1","topic":"persistent://tenant/namespace/topic-name","subscription":"load-test"}
CustomProperties: {"auth.token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD"}
Statistic: {"receivedBytes":5480943882,"errorRows":0,"committedTaskNum":696,"loadedRows":66243440,"loadRowsRate":29000,"abortedTaskNum":0,"totalRows":66243440,"unselectedRows":0,"receivedBytesRate":2400000,"taskExecuteTimeMs":2283166}
Progress: {"my-partition-0(backlog): 100","my-partition-1(backlog): 0"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)

检查加载任务

执行 SHOW ROUTINE LOAD TASK 语句以检查加载作业 routine_wiki_edit_1 的加载任务,例如正在运行的任务数、消费的 Kafka Topic 分区和消费进度 DataSourceProperties 以及相应的 Coordinator BE 节点 BeId

MySQL [example_db]> SHOW ROUTINE LOAD TASK WHERE JobName = "routine_wiki_edit_1" \G

修改加载作业

在修改加载作业之前,必须使用 PAUSE ROUTINE LOAD 语句暂停它。然后,您可以执行 ALTER ROUTINE LOAD。修改后,您可以执行 RESUME ROUTINE LOAD 语句以恢复它,并使用 SHOW ROUTINE LOAD 语句检查其状态。

当 Routine Load 用于从 Pulsar 消费数据时,除 data_source_properties 之外的大多数返回参数与从 Kafka 消费数据相同。

请注意以下几点:

  • data_source_properties 相关参数中,目前仅支持修改 pulsar_partitionspulsar_initial_positions 和自定义 Pulsar 参数 property.pulsar_default_initial_positionproperty.auth.token。参数 pulsar_service_urlpulsar_topicpulsar_subscription 无法修改。
  • 如果需要修改要消费的分区和匹配的初始位置,则需要确保在创建 Routine Load 作业时使用 pulsar_partitions 指定分区,并且只能修改指定分区的初始位置 pulsar_initial_positions
  • 如果在创建 Routine Load 作业时仅指定 Topic pulsar_topic,但不指定分区 pulsar_partitions,则可以通过 pulsar_default_initial_position 修改 Topic 下所有分区的起始位置。