跳到主要内容
版本: 最新版本-3.5

从 Apache Flink® 持续加载数据

StarRocks 提供了一个名为 StarRocks Connector for Apache Flink®(简称 Flink Connector)的自研连接器,以帮助您使用 Flink 将数据加载到 StarRocks 表中。其基本原理是累积数据,然后通过 STREAM LOAD 一次性将其加载到 StarRocks 中。

Flink Connector 支持 DataStream API、Table API & SQL 和 Python API。与 Apache Flink® 提供的 flink-connector-jdbc 相比,它具有更高和更稳定的性能。

注意

使用 Flink Connector 将数据加载到 StarRocks 表中需要对目标 StarRocks 表具有 SELECT 和 INSERT 权限。如果您没有这些权限,请按照 GRANT 中提供的说明,将这些权限授予用于连接到 StarRocks 集群的用户。

版本要求

连接器FlinkStarRocksJavaScala
1.2.111.15,1.16,1.17,1.18,1.19,1.202.1 及更高版本82.11,2.12
1.2.101.15,1.16,1.17,1.18,1.192.1 及更高版本82.11,2.12
1.2.91.15,1.16,1.17,1.182.1 及更高版本82.11,2.12
1.2.81.13,1.14,1.15,1.16,1.172.1 及更高版本82.11,2.12
1.2.71.11,1.12,1.13,1.14,1.152.1 及更高版本82.11,2.12

您可以通过以下方式获取 Flink Connector JAR 文件

  • 直接下载已编译的 Flink Connector JAR 文件。
  • 将 Flink Connector 作为依赖项添加到您的 Maven 项目中,然后下载 JAR 文件。
  • 自行将 Flink Connector 的源代码编译成 JAR 文件。

Flink Connector JAR 文件的命名格式如下

  • 自 Flink 1.15 起,格式为 flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar。例如,如果您安装了 Flink 1.15 并且想要使用 Flink Connector 1.2.7,则可以使用 flink-connector-starrocks-1.2.7_flink-1.15.jar

  • 在 Flink 1.15 之前,格式为 flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar。例如,如果您的环境中安装了 Flink 1.14 和 Scala 2.12,并且想要使用 Flink Connector 1.2.7,则可以使用 flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar

注意

一般来说,最新版本的 Flink 连接器仅保持与最近三个 Flink 版本的兼容性。

下载已编译的 Jar 文件

Maven 中央仓库 直接下载相应版本的 Flink Connector Jar 文件。

Maven 依赖

在 Maven 项目的 pom.xml 文件中,按照以下格式将 Flink Connector 添加为依赖项。将 flink_versionscala_versionconnector_version 替换为相应的版本。

  • 在 Flink 1.15 及更高版本中

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <version>${connector_version}_flink-${flink_version}</version>
    </dependency>
  • 在低于 Flink 1.15 的版本中

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <version>${connector_version}_flink-${flink_version}_${scala_version}</version>
    </dependency>

自行编译

  1. 下载 Flink Connector 源代码

  2. 执行以下命令将 Flink Connector 的源代码编译成 JAR 文件。请注意,flink_version 将被替换为相应的 Flink 版本。

    sh build.sh <flink_version>

    例如,如果您的环境中的 Flink 版本为 1.15,则需要执行以下命令

    sh build.sh 1.15
  3. 转到 target/ 目录以查找编译后生成的 Flink Connector JAR 文件,例如 flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar

注意

非正式发布的 Flink Connector 的名称包含 SNAPSHOT 后缀。

选项

connector

必需:是
默认值:NONE
描述:您想要使用的连接器。该值必须为 “starrocks”。

jdbc-url

必需:是
默认值:NONE
描述:用于连接到 FE 的 MySQL 服务器的地址。您可以指定多个地址,这些地址必须用逗号 (,) 分隔。格式:jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>

load-url

必需:是
默认值:NONE
描述:用于连接到 FE 的 HTTP 服务器的地址。您可以指定多个地址,这些地址必须用分号 (;) 分隔。格式:<fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>

database-name

必需:是
默认值:NONE
描述:您想要加载数据的 StarRocks 数据库的名称。

table-name

必需:是
默认值:NONE
描述:您想要用于将数据加载到 StarRocks 中的表的名称。

username

必需:是
默认值:NONE
描述:您想要用于将数据加载到 StarRocks 中的帐户的用户名。该帐户需要对目标 StarRocks 表具有 SELECT 和 INSERT 权限

password

必需:是
默认值:NONE
描述:上述帐户的密码。

sink.version

必需:否
默认值:AUTO
描述:用于加载数据的接口。从 Flink Connector 版本 1.2.4 开始支持此参数。

  • V1:使用 Stream Load 接口加载数据。1.2.4 之前的 Connector 仅支持此模式。
  • V2:使用 Stream Load 事务 接口加载数据。它要求 StarRocks 至少为 2.4 版本。建议使用 V2,因为它优化了内存使用并提供了更稳定的精确一次实现。
  • AUTO:如果 StarRocks 版本支持事务 Stream Load,则将自动选择 V2,否则选择 V1

sink.label-prefix

必需:否
默认值:NONE
描述:Stream Load 使用的标签前缀。如果您使用 Connector 1.2.8 及更高版本进行精确一次语义,建议配置它。请参阅 精确一次使用说明

sink.semantic

必需:否
默认值:at-least-once
描述:sink 保证的语义。有效值:at-least-onceexactly-once

sink.buffer-flush.max-bytes

必需:否
默认值:94371840(90M)
描述:在一次性发送到 StarRocks 之前可以在内存中累积的最大数据大小。最大值范围为 64 MB 到 10 GB。将此参数设置为较大的值可以提高加载性能,但可能会增加加载延迟。此参数仅在 sink.semantic 设置为 at-least-once 时生效。如果 sink.semantic 设置为 exactly-once,则当触发 Flink 检查点时,内存中的数据将被刷新。在这种情况下,此参数不生效。

sink.buffer-flush.max-rows

必需:否
默认值: 500000
描述:在一次性发送到 StarRocks 之前可以在内存中累积的最大行数。仅当 sink.versionV1sink.semanticat-least-once 时,此参数才可用。有效值:64000 到 5000000。

sink.buffer-flush.interval-ms

必需:否
默认值: 300000
描述:刷新数据的间隔。仅当 sink.semanticat-least-once 时,此参数才可用。有效值:1000 到 3600000。单位:毫秒。

sink.max-retries

必需:否
默认值: 3
描述:系统重试执行 Stream Load 作业的次数。仅当您将 sink.version 设置为 V1 时,此参数才可用。有效值:0 到 10。

sink.connect.timeout-ms

必需:否
默认值: 30000
描述:建立 HTTP 连接的超时时间。有效值:100 到 60000。单位:毫秒。在 Flink Connector v1.2.9 之前,默认值为 1000

sink.socket.timeout-ms

必需:否
默认值: -1
描述:自 1.2.10 起受支持。HTTP 客户端等待数据的持续时间。单位:毫秒。默认值 -1 表示没有超时。

sink.wait-for-continue.timeout-ms

必需:否
默认值: 10000
描述:自 1.2.7 起受支持。等待 FE 的 HTTP 100-continue 响应的超时时间。有效值:300060000。单位:毫秒

sink.ignore.update-before

必需:否
默认值:true
描述:自 1.2.8 版本起受支持。加载数据到主键表时是否忽略来自 Flink 的 UPDATE_BEFORE 记录。如果此参数设置为 false,则该记录将被视为对 StarRocks 表的删除操作。

sink.parallelism

必需:否
默认值:NONE
描述:加载的并行度。仅适用于 Flink SQL。如果未指定此参数,则 Flink 规划器将决定并行度。在多并行度场景中,用户需要保证数据以正确的顺序写入。

sink.properties.*

必需:否
默认值:NONE
描述:用于控制 Stream Load 行为的参数。例如,参数 sink.properties.format 指定用于 Stream Load 的格式,例如 CSV 或 JSON。有关支持的参数及其描述的列表,请参阅 STREAM LOAD

sink.properties.format

必需:否
默认值:csv
描述:用于 Stream Load 的格式。Flink Connector 将在将每批数据发送到 StarRocks 之前将其转换为该格式。有效值:csvjson

sink.properties.column_separator

必需:否
默认值:\t
描述:CSV 格式数据的列分隔符。

sink.properties.row_delimiter

必需:否
默认值:\n
描述:CSV 格式数据的行分隔符。

sink.properties.max_filter_ratio

必需:否
默认值: 0
描述:Stream Load 的最大错误容忍度。它是由于数据质量不足而可以过滤掉的数据记录的最大百分比。有效值:01。默认值:0。有关详细信息,请参阅 Stream Load

sink.properties.partial_update

必需:否
默认值FALSE
描述:是否使用部分更新。有效值:TRUEFALSE。默认值:FALSE,表示禁用此功能。

sink.properties.partial_update_mode

必需:否
默认值row
描述:指定部分更新的模式。有效值:rowcolumn

  • row(默认)表示行模式下的部分更新,更适合具有许多列和小批量的实时更新。
  • column 表示列模式下的部分更新,更适合于具有少量列和许多行的批量更新。在这种情况下,启用列模式可以提供更快的更新速度。例如,在一个具有 100 列的表中,如果仅更新所有行的 10 列(占总数的 10%),则列模式的更新速度将快 10 倍。

sink.properties.strict_mode

必需:否
默认值:false
描述:指定是否为 Stream Load 启用严格模式。当存在不合格的行(例如列值不一致)时,它会影响加载行为。有效值:truefalse。默认值:false。有关详细信息,请参阅 Stream Load

sink.properties.compression

必需:否
默认值:NONE
描述:Stream Load 使用的压缩算法。有效值:lz4_frame。JSON 格式的压缩需要 Flink Connector 1.2.10+ 和 StarRocks v3.2.7+。CSV 格式的压缩仅需要 Flink Connector 1.2.11+。

Flink 数据类型StarRocks 数据类型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BINARYINT
CHARSTRING
VARCHARSTRING
STRINGSTRING
DATEDATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIME
ARRAY<T>ARRAY<T>
MAP<KT,VT>JSON STRING
ROW<arg T...>JSON STRING

使用说明

精确一次

  • 如果您希望 sink 保证精确一次语义,我们建议您将 StarRocks 升级到 2.5 或更高版本,并将 Flink Connector 升级到 1.2.4 或更高版本

    • 自 Flink Connector 1.2.4 起,精确一次是基于 StarRocks 自 2.4 起提供的 Stream Load 事务接口 重新设计的。与之前基于非事务性 Stream Load 非事务性接口的实现相比,新实现减少了内存使用和检查点开销,从而提高了加载的实时性能和稳定性。

    • 如果 StarRocks 的版本早于 2.4 或 Flink Connector 的版本早于 1.2.4,则 sink 将自动选择基于 Stream Load 非事务性接口的实现。

  • 保证精确一次的配置

    • sink.semantic 的值需要为 exactly-once

    • 如果 Flink Connector 的版本为 1.2.8 及更高版本,建议指定 sink.label-prefix 的值。请注意,标签前缀在 StarRocks 中的所有加载类型(例如 Flink 作业、Routine Load 和 Broker Load)中必须是唯一的。

      • 如果指定了标签前缀,Flink Connector 将使用标签前缀来清理可能在某些 Flink 故障场景中生成的长期存在的事务,例如 Flink 作业在检查点仍在进行时失败。如果您使用 SHOW PROC '/transactions/<db_id>/running'; 在 StarRocks 中查看它们,这些长期存在的事务通常处于 PREPARED 状态。当 Flink 作业从检查点恢复时,Flink Connector 将根据标签前缀和检查点中的一些信息找到这些长期存在的事务,并中止它们。由于实现精确一次的两阶段提交机制,Flink Connector 在 Flink 作业退出时无法中止它们。当 Flink 作业退出时,Flink Connector 尚未收到来自 Flink 检查点协调器的通知,告知事务是否应包含在成功的检查点中,如果无论如何都中止这些事务,可能会导致数据丢失。您可以在此 博客文章 中大致了解如何在 Flink 中实现端到端精确一次。

      • 如果未指定标签前缀,则仅在长期存在的事务超时后,StarRocks 才会清理它们。但是,如果在事务超时之前 Flink 作业频繁失败,则运行事务的数量可能会达到 StarRocks max_running_txn_num_per_db 的限制。超时长度由 StarRocks FE 配置 prepared_transaction_default_timeout_second 控制,其默认值为 86400(1 天)。您可以为其设置较小的值,以便在未指定标签前缀时更快地使事务过期。

  • 如果您确定 Flink 作业最终会在由于停止或连续故障转移而长时间停机后从检查点或保存点恢复,请相应地调整以下 StarRocks 配置,以避免数据丢失。

    • prepared_transaction_default_timeout_second:StarRocks FE 配置,默认值为 86400。此配置的值需要大于 Flink 作业的停机时间。否则,由于在您重新启动 Flink 作业之前超时,可能会中止成功的检查点中包含的长期存在的事务,从而导致数据丢失。

      请注意,当您为此配置设置较大的值时,最好指定 sink.label-prefix 的值,以便可以根据标签前缀和检查点中的一些信息来清理长期存在的事务,而不是由于超时(这可能会导致数据丢失)。

    • label_keep_max_secondlabel_keep_max_num:StarRocks FE 配置,默认值分别为 2592001000。有关详细信息,请参阅 FE 配置label_keep_max_second 的值需要大于 Flink 作业的停机时间。否则,Flink Connector 无法通过使用 Flink 保存点或检查点中保存的事务标签来检查 StarRocks 中事务的状态,并确定这些事务是否已提交,这最终可能导致数据丢失。

    这些配置是可变的,可以使用 ADMIN SET FRONTEND CONFIG 进行修改

      ADMIN SET FRONTEND CONFIG ("prepared_transaction_default_timeout_second" = "3600");
    ADMIN SET FRONTEND CONFIG ("label_keep_max_second" = "259200");
    ADMIN SET FRONTEND CONFIG ("label_keep_max_num" = "1000");

刷新策略

Flink Connector 会将数据缓存在内存中,并通过 Stream Load 将它们批量刷新到 StarRocks。在至少一次和精确一次之间,刷新触发方式不同。

对于至少一次,当满足以下任何条件时,将触发刷新

  • 缓冲行的字节数达到限制 sink.buffer-flush.max-bytes
  • 缓冲行的数量达到限制 sink.buffer-flush.max-rows。(仅对 sink 版本 V1 有效)
  • 自上次刷新以来经过的时间达到限制 sink.buffer-flush.interval-ms
  • 触发检查点

对于精确一次,刷新仅在触发检查点时才会发生。

监控加载指标

Flink Connector 提供了以下指标来监控加载。

指标类型描述
totalFlushBytes计数器成功刷新的字节数。
totalFlushRows计数器成功刷新的行数。
totalFlushSucceededTimes计数器成功刷新数据的次数。
totalFlushFailedTimes计数器数据刷新失败的次数。
totalFilteredRows计数器已过滤的行数,也包含在 totalFlushRows 中。

示例

以下示例说明如何使用 Flink Connector 将数据加载到具有 Flink SQL 或 Flink DataStream 的 StarRocks 表中。

准备工作

创建 StarRocks 表

创建一个数据库 test 并创建一个 Primary Key 表 score_board

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
  • 下载 Flink 二进制文件 Flink 1.15.2,并将其解压缩到目录 flink-1.15.2

  • 下载 Flink Connector 1.2.7,并将其放入目录 flink-1.15.2/lib

  • 运行以下命令以启动 Flink 集群

    cd flink-1.15.2
    ./bin/start-cluster.sh

网络配置

确保 Flink 所在的机器可以通过 http_port (默认: 8030) 和 query_port (默认: 9030) 访问 StarRocks 集群的 FE 节点,并且可以通过 be_http_port (默认: 8040) 访问 BE 节点。

  • 运行以下命令以启动 Flink SQL 客户端。

    ./bin/sql-client.sh
  • 创建一个 Flink 表 score_board,并通过 Flink SQL 客户端将值插入到该表中。请注意,如果您想将数据加载到 StarRocks 的主键表中,则必须在 Flink DDL 中定义主键。对于其他类型的 StarRocks 表,它是可选的。

    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',

    'table-name' = 'score_board',
    'username' = 'root',
    'password' = ''
    );

    INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

根据输入记录的类型(例如 CSV Java String、JSON Java String 或自定义 Java 对象),有几种方法可以实现 Flink DataStream 作业。

  • 输入记录是 CSV 格式的 String。有关完整示例,请参阅 LoadCsvRecords

    /**
    * Generate CSV-format records. Each record has three values separated by "\t".
    * These values will be loaded to the columns `id`, `name`, and `score` in the StarRocks table.
    */
    String[] records = new String[]{
    "1\tstarrocks-csv\t100",
    "2\tflink-csv\t100"
    };
    DataStream<String> source = env.fromElements(records);

    /**
    * Configure the connector with the required properties.
    * You also need to add properties "sink.properties.format" and "sink.properties.column_separator"
    * to tell the connector the input records are CSV-format, and the column separator is "\t".
    * You can also use other column separators in the CSV-format records,
    * but remember to modify the "sink.properties.column_separator" correspondingly.
    */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
    .withProperty("jdbc-url", jdbcUrl)
    .withProperty("load-url", loadUrl)
    .withProperty("database-name", "test")
    .withProperty("table-name", "score_board")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("sink.properties.format", "csv")
    .withProperty("sink.properties.column_separator", "\t")
    .build();
    // Create the sink with the options.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • 输入记录是 JSON 格式的 String。有关完整示例,请参阅 LoadJsonRecords

    /**
    * Generate JSON-format records.
    * Each record has three key-value pairs corresponding to the columns `id`, `name`, and `score` in the StarRocks table.
    */
    String[] records = new String[]{
    "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}",
    "{\"id\":2, \"name\":\"flink-json\", \"score\":100}",
    };
    DataStream<String> source = env.fromElements(records);

    /**
    * Configure the connector with the required properties.
    * You also need to add properties "sink.properties.format" and "sink.properties.strip_outer_array"
    * to tell the connector the input records are JSON-format and to strip the outermost array structure.
    */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
    .withProperty("jdbc-url", jdbcUrl)
    .withProperty("load-url", loadUrl)
    .withProperty("database-name", "test")
    .withProperty("table-name", "score_board")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("sink.properties.format", "json")
    .withProperty("sink.properties.strip_outer_array", "true")
    .build();
    // Create the sink with the options.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • 输入记录是自定义 Java 对象。有关完整示例,请参阅 LoadCustomJavaRecords

    • 在此示例中,输入记录是一个简单的 POJO RowData

      public static class RowData {
      public int id;
      public String name;
      public int score;

      public RowData() {}

      public RowData(int id, String name, int score) {
      this.id = id;
      this.name = name;
      this.score = score;
      }
      }
    • 主程序如下

      // Generate records which use RowData as the container.
      RowData[] records = new RowData[]{
      new RowData(1, "starrocks-rowdata", 100),
      new RowData(2, "flink-rowdata", 100),
      };
      DataStream<RowData> source = env.fromElements(records);

      // Configure the connector with the required properties.
      StarRocksSinkOptions options = StarRocksSinkOptions.builder()
      .withProperty("jdbc-url", jdbcUrl)
      .withProperty("load-url", loadUrl)
      .withProperty("database-name", "test")
      .withProperty("table-name", "score_board")
      .withProperty("username", "root")
      .withProperty("password", "")
      .build();

      /**
      * The Flink connector will use a Java object array (Object[]) to represent a row to be loaded into the StarRocks table,
      * and each element is the value for a column.
      * You need to define the schema of the Object[] which matches that of the StarRocks table.
      */
      TableSchema schema = TableSchema.builder()
      .field("id", DataTypes.INT().notNull())
      .field("name", DataTypes.STRING())
      .field("score", DataTypes.INT())
      // When the StarRocks table is a Primary Key table, you must specify notNull(), for example, DataTypes.INT().notNull(), for the primary key `id`.
      .primaryKey("id")
      .build();
      // Transform the RowData to the Object[] according to the schema.
      RowDataTransformer transformer = new RowDataTransformer();
      // Create the sink with the schema, options, and transformer.
      SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer);
      source.addSink(starRockSink);
    • 主程序中的 RowDataTransformer 定义如下

      private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> {

      /**
      * Set each element of the object array according to the input RowData.
      * The schema of the array matches that of the StarRocks table.
      */
      @Override
      public void accept(Object[] internalRow, RowData rowData) {
      internalRow[0] = rowData.id;
      internalRow[1] = rowData.name;
      internalRow[2] = rowData.score;
      // When the StarRocks table is a Primary Key table, you need to set the last element to indicate whether the data loading is an UPSERT or DELETE operation.
      internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
      }
      }

Flink CDC 3.0 框架可用于轻松构建从 CDC 源(例如 MySQL 和 Kafka)到 StarRocks 的流式 ELT 管道。该管道可以同步整个数据库、合并分片表以及从源到 StarRocks 的模式更改。

自 v1.2.9 起,用于 StarRocks 的 Flink Connector 已作为 StarRocks Pipeline Connector 集成到此框架中。StarRocks Pipeline Connector 支持

  • 自动创建数据库和表
  • 模式更改同步
  • 完整和增量数据同步

有关快速入门,请参阅 使用 Flink CDC 3.0 和 StarRocks Pipeline Connector 从 MySQL 流式 ELT 到 StarRocks

建议使用 StarRocks v3.2.1 及更高版本以启用 fast_schema_evolution。它将提高添加或删除列的速度并减少资源使用。

最佳实践

将数据加载到主键表

本节将演示如何将数据加载到 StarRocks 主键表以实现部分更新和条件更新。您可以参阅 通过加载更改数据 以了解这些功能的介绍。这些示例使用 Flink SQL。

准备工作

创建一个数据库 test 并在 StarRocks 中创建一个 Primary Key 表 score_board

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

部分更新

本示例将演示如何仅将数据加载到 idname 列。

  1. 在 MySQL 客户端中将两行数据插入到 StarRocks 表 score_board 中。

    mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | starrocks | 100 |
    | 2 | flink | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. 在 Flink SQL 客户端中创建 Flink 表 score_board

    • 定义仅包含 idname 列的 DDL。
    • 将选项 sink.properties.partial_update 设置为 true,这将告诉 Flink Connector 执行部分更新。
    • 如果 Flink Connector 版本 <= 1.2.7,您还需要将选项 sink.properties.columns 设置为 id,name,__op,以告知 Flink Connector 需要更新哪些列。请注意,您需要在末尾附加字段 __op。字段 __op 指示数据加载是 UPSERT 还是 DELETE 操作,其值由 Connector 自动设置。
    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'score_board',
    'username' = 'root',
    'password' = '',
    'sink.properties.partial_update' = 'true',
    -- only for Flink connector version <= 1.2.7
    'sink.properties.columns' = 'id,name,__op'
    );
  3. 将两行数据插入到 Flink 表中。数据行的主键与 StarRocks 表中的行相同。但列 name 中的值已修改。

    INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'flink-update');
  4. 在 MySQL 客户端中查询 StarRocks 表。

    mysql> select * from score_board;
    +------+------------------+-------+
    | id | name | score |
    +------+------------------+-------+
    | 1 | starrocks-update | 100 |
    | 2 | flink-update | 100 |
    +------+------------------+-------+
    2 rows in set (0.02 sec)

    您可以看到只有 name 的值发生了更改,而 score 的值没有发生更改。

条件更新

本示例将演示如何根据列 score 的值执行条件更新。仅当 score 的新值大于或等于旧值时,id 的更新才会生效。

  1. 在 MySQL 客户端中将两行数据插入到 StarRocks 表中。

    mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | starrocks | 100 |
    | 2 | flink | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. 通过以下方式创建 Flink 表 score_board

    • 定义包含所有列的 DDL。
    • 将选项 sink.properties.merge_condition 设置为 score,以告知 Connector 使用列 score 作为条件。
    • 将选项 sink.version 设置为 V1,以告知 Connector 使用 Stream Load。
    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'score_board',
    'username' = 'root',
    'password' = '',
    'sink.properties.merge_condition' = 'score',
    'sink.version' = 'V1'
    );
  3. 将两行数据插入到 Flink 表中。数据行的主键与 StarRocks 表中的行相同。第一行数据在列 score 中具有较小的值,第二行数据在列 score 中具有较大的值。

    INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101);
  4. 在 MySQL 客户端中查询 StarRocks 表。

    mysql> select * from score_board;
    +------+--------------+-------+
    | id | name | score |
    +------+--------------+-------+
    | 1 | starrocks | 100 |
    | 2 | flink-update | 101 |
    +------+--------------+-------+
    2 rows in set (0.03 sec)

    您可以看到只有第二行数据的值发生了变化,第一行数据的值没有变化。

将数据加载到 BITMAP 类型的列中

BITMAP 常用于加速Count Distinct操作,例如计算UV,请参阅使用Bitmap进行精确的Count Distinct。 这里我们以计算UV为例,展示如何将数据加载到BITMAP类型的列中。

  1. 在MySQL客户端中创建StarRocks Aggregate表。

    在数据库 test 中,创建一个 Aggregate 表 page_uv,其中列 visit_users 定义为 BITMAP 类型,并配置了聚合函数 BITMAP_UNION

    CREATE TABLE `test`.`page_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. 在Flink SQL客户端中创建Flink表。

    Flink表中的列visit_user_idBIGINT类型,我们想将此列加载到StarRocks表中BITMAP类型的列visit_users。 因此,在定义Flink表的DDL时,请注意

    • 由于Flink不支持BITMAP,因此您需要定义一个visit_user_id列作为BIGINT类型,以表示StarRocks表中BITMAP类型的visit_users列。
    • 您需要设置选项sink.properties.columnspage_id,visit_date,user_id,visit_users=to_bitmap(visit_user_id),这告诉连接器Flink表和StarRocks表之间的列映射关系。 此外,您需要使用to_bitmap函数来告诉连接器将BIGINT类型的数据转换为BITMAP类型。
    CREATE TABLE `page_uv` (
    `page_id` INT,
    `visit_date` TIMESTAMP,
    `visit_user_id` BIGINT
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'page_uv',
    'username' = 'root',
    'password' = '',
    'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)'
    );
  3. 在Flink SQL客户端中将数据加载到Flink表中。

    INSERT INTO `page_uv` VALUES
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
    (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
    (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
    (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  4. 在MySQL客户端中从StarRocks表计算页面UV。

    MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`;
    +---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    | 2 | 1 |
    | 1 | 3 |
    +---------+-----------------------------+
    2 rows in set (0.05 sec)

将数据加载到 HLL 类型的列中

HLL 可用于近似 Count Distinct,请参阅 使用 HLL 进行近似 Count Distinct

这里我们以计算UV为例,展示如何将数据加载到HLL类型的列中。

  1. 创建一个StarRocks Aggregate表

    在数据库 test 中,创建一个 Aggregate 表 hll_uv,其中列 visit_users 定义为 HLL 类型,并配置了聚合函数 HLL_UNION

    CREATE TABLE `hll_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. 在Flink SQL客户端中创建Flink表。

    Flink表中的列visit_user_idBIGINT类型,我们想将此列加载到StarRocks表中HLL类型的列visit_users。 因此,在定义Flink表的DDL时,请注意

    • 由于Flink不支持BITMAP,因此您需要定义一个visit_user_id列作为BIGINT类型,以表示StarRocks表中HLL类型的visit_users列。
    • 您需要设置选项sink.properties.columnspage_id,visit_date,user_id,visit_users=hll_hash(visit_user_id),这告诉连接器Flink表和StarRocks表之间的列映射关系。 此外,您需要使用hll_hash函数来告诉连接器将BIGINT类型的数据转换为HLL类型。
    CREATE TABLE `hll_uv` (
    `page_id` INT,
    `visit_date` TIMESTAMP,
    `visit_user_id` BIGINT
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'hll_uv',
    'username' = 'root',
    'password' = '',
    'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)'
    );
  3. 在Flink SQL客户端中将数据加载到Flink表中。

    INSERT INTO `hll_uv` VALUES
    (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
    (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
    (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  4. 在MySQL客户端中从StarRocks表计算页面UV。

    mysql> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;
    **+---------+-----------------------------+
    | page_id | count(DISTINCT visit_users) |
    +---------+-----------------------------+
    | 3 | 2 |
    | 4 | 1 |
    +---------+-----------------------------+
    2 rows in set (0.04 sec)