从 RisingWave 将数据 Sink 到 StarRocks
RisingWave 是一个分布式 SQL 流式数据库,可以简单、高效、可靠地处理流式数据。 要快速开始使用 RisingWave,请参阅 Get started。
RisingWave 提供了数据 Sink 功能,使用户可以直接将数据 Sink 到 StarRocks,而无需任何其他第三方组件。 此功能可以与所有 StarRocks 表类型一起使用:Duplicate Key、Primary Key、Aggregate 和 Unique Key 表。
前提条件
- 您有一个正在运行的 RisingWave 集群 v1.7 或更高版本。
- 您可以访问目标 StarRocks 表,并且 StarRocks 版本为 v2.5 或更高版本。
- 要将数据 sink 到 StarRocks 表中,您必须具有目标表的 SELECT 和 INSERT 权限。 要授予权限,请参阅 GRANT。
提示
RisingWave 仅支持 StarRocks Sink 的至少一次语义,这意味着在发生故障时,可能会写入重复数据。 建议您使用 StarRocks Primary Key 表,它可以对数据进行去重并实现端到端的幂等写入。
参数
下表描述了从 RisingWave 将数据 sink 到 StarRocks 时需要配置的参数。 除非另有说明,否则所有参数都是必需的。
参数 | 描述 |
---|---|
connector | 将其设置为 starrocks 。 |
starrocks.host | StarRocks FE 节点的 IP 地址。 |
starrocks.query_port | FE 节点的查询端口。 |
starrocks.http_port | FE 节点的 HTTP 端口。 |
starrocks.user | 用于访问 StarRocks 集群的用户名。 |
starrocks.password | 与用户名关联的密码。 |
starrocks.database | 目标表所在的 StarRocks 数据库。 |
starrocks.table | 要将数据 sink 到的 StarRocks 表。 |
starrocks.partial_update | (可选) 是否启用 StarRocks 部分更新功能。 当只需要更新少量列时,启用此功能可以提高 Sink 性能。 |
type | sink 期间的数据操作类型。
|
force_append_only | (可选) 当 type 设置为 append-only 但 sink 过程中也存在 Upsert 和 Delete 操作时,此设置可以强制 Sink 任务生成仅追加数据并丢弃 Upsert 和 Delete 数据。 |
primary_key | (可选) StarRocks 表的主键。 如果 type 为 upsert ,则为必需项。 |
数据类型映射
下表提供了 RisingWave 和 StarRocks 之间的数据类型映射。
RisingWave | StarRocks |
---|---|
BOOLEAN | BOOLEAN |
SMALLINT | SMALLINT |
INTEGER | INT |
BIGINT | BIGINT |
REAL | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
DATE | DATE |
VARCHAR | VARCHAR |
TIME (在 sink 到 StarRocks 之前转换为 VARCHAR) | 不支持 |
TIMESTAMP | DATETIME |
TIMESTAMP WITH TIME ZONE (在 sink 到 StarRocks 之前转换为 TIMESTAMP) | 不支持 |
INTERVAL (在 sink 到 StarRocks 之前转换为 VARCHAR) | 不支持 |
STRUCT | JSON |
ARRAY | ARRAY |
BYTEA (在 sink 到 StarRocks 之前转换为 VARCHAR) | 不支持 |
JSONB | JSON |
SERIAL | BIGINT |
示例
-
在 StarRocks 中创建一个数据库
demo
,并在此数据库中创建一个 Primary Key 表score_board
。CREATE DATABASE demo;
USE demo;
CREATE TABLE demo.score_board(
id int(11) NOT NULL COMMENT "",
name varchar(65533) NULL DEFAULT "" COMMENT "",
score int(11) NOT NULL DEFAULT "0" COMMENT ""
)
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id); -
将数据从 RisingWave sink 到 StarRocks。
-- Create a table in RisingWave.
CREATE TABLE score_board (
id INT PRIMARY KEY,
name VARCHAR,
score INT
);
-- Insert data into the table.
INSERT INTO score_board VALUES (1, 'starrocks', 100), (2, 'risingwave', 100);
-- Sink data from this table to the StarRocks table.
CREATE SINK score_board_sink
FROM score_board WITH (
connector = 'starrocks',
type = 'upsert',
starrocks.host = 'starrocks-fe',
starrocks.mysqlport = '9030',
starrocks.httpport = '8030',
starrocks.user = 'users',
starrocks.password = '123456',
starrocks.database = 'demo',
starrocks.table = 'score_board',
primary_key = 'id'
);