跳到主要内容
版本: 最新版本-3.5

从 RisingWave 将数据 Sink 到 StarRocks

RisingWave 是一个分布式 SQL 流式数据库,可以简单、高效、可靠地处理流式数据。 要快速开始使用 RisingWave,请参阅 Get started

RisingWave 提供了数据 Sink 功能,使用户可以直接将数据 Sink 到 StarRocks,而无需任何其他第三方组件。 此功能可以与所有 StarRocks 表类型一起使用:Duplicate Key、Primary Key、Aggregate 和 Unique Key 表。

前提条件

  • 您有一个正在运行的 RisingWave 集群 v1.7 或更高版本。
  • 您可以访问目标 StarRocks 表,并且 StarRocks 版本为 v2.5 或更高版本。
  • 要将数据 sink 到 StarRocks 表中,您必须具有目标表的 SELECT 和 INSERT 权限。 要授予权限,请参阅 GRANT
提示

RisingWave 仅支持 StarRocks Sink 的至少一次语义,这意味着在发生故障时,可能会写入重复数据。 建议您使用 StarRocks Primary Key 表,它可以对数据进行去重并实现端到端的幂等写入。

参数

下表描述了从 RisingWave 将数据 sink 到 StarRocks 时需要配置的参数。 除非另有说明,否则所有参数都是必需的。

参数描述
connector将其设置为 starrocks
starrocks.hostStarRocks FE 节点的 IP 地址。
starrocks.query_portFE 节点的查询端口。
starrocks.http_portFE 节点的 HTTP 端口。
starrocks.user用于访问 StarRocks 集群的用户名。
starrocks.password与用户名关联的密码。
starrocks.database目标表所在的 StarRocks 数据库。
starrocks.table要将数据 sink 到的 StarRocks 表。
starrocks.partial_update(可选) 是否启用 StarRocks 部分更新功能。 当只需要更新少量列时,启用此功能可以提高 Sink 性能。
typesink 期间的数据操作类型。
  • append-only:仅执行 INSERT 操作。
  • upsert:执行 Upsert 操作。 如果使用此设置,则 StarRocks 目标表必须是 Primary Key 表。
force_append_only(可选) 当 type 设置为 append-only 但 sink 过程中也存在 Upsert 和 Delete 操作时,此设置可以强制 Sink 任务生成仅追加数据并丢弃 Upsert 和 Delete 数据。
primary_key(可选) StarRocks 表的主键。 如果 typeupsert,则为必需项。

数据类型映射

下表提供了 RisingWave 和 StarRocks 之间的数据类型映射。

RisingWaveStarRocks
BOOLEANBOOLEAN
SMALLINTSMALLINT
INTEGERINT
BIGINTBIGINT
REALFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
DATEDATE
VARCHARVARCHAR
TIME
(在 sink 到 StarRocks 之前转换为 VARCHAR)
不支持
TIMESTAMPDATETIME
TIMESTAMP WITH TIME ZONE
(在 sink 到 StarRocks 之前转换为 TIMESTAMP)
不支持
INTERVAL
(在 sink 到 StarRocks 之前转换为 VARCHAR)
不支持
STRUCTJSON
ARRAYARRAY
BYTEA
(在 sink 到 StarRocks 之前转换为 VARCHAR)
不支持
JSONBJSON
SERIALBIGINT

示例

  1. 在 StarRocks 中创建一个数据库 demo,并在此数据库中创建一个 Primary Key 表 score_board

    CREATE DATABASE demo;
    USE demo;

    CREATE TABLE demo.score_board(
    id int(11) NOT NULL COMMENT "",
    name varchar(65533) NULL DEFAULT "" COMMENT "",
    score int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    PRIMARY KEY(id)
    DISTRIBUTED BY HASH(id);
  2. 将数据从 RisingWave sink 到 StarRocks。

    -- Create a table in RisingWave.
    CREATE TABLE score_board (
    id INT PRIMARY KEY,
    name VARCHAR,
    score INT
    );

    -- Insert data into the table.
    INSERT INTO score_board VALUES (1, 'starrocks', 100), (2, 'risingwave', 100);

    -- Sink data from this table to the StarRocks table.
    CREATE SINK score_board_sink
    FROM score_board WITH (
    connector = 'starrocks',
    type = 'upsert',
    starrocks.host = 'starrocks-fe',
    starrocks.mysqlport = '9030',
    starrocks.httpport = '8030',
    starrocks.user = 'users',
    starrocks.password = '123456',
    starrocks.database = 'demo',
    starrocks.table = 'score_board',
    primary_key = 'id'
    );