跳到主要内容
版本: 最新版本-3.5

使用 Flink Connector 从 StarRocks 读取数据

StarRocks 提供了一个自研的 Connector,名为 StarRocks Connector for Apache Flink® (简称 Flink Connector),它可以帮助您使用 Flink 从 StarRocks 集群批量读取数据。

Flink Connector 支持两种读取方式:Flink SQL 和 Flink DataStream。推荐使用 Flink SQL。

注意

Flink Connector 也支持将 Flink 读取的数据写入另一个 StarRocks 集群或存储系统。请参阅从 Apache Flink® 持续加载数据

背景信息

与 Flink 提供的 JDBC Connector 不同,StarRocks 的 Flink Connector 支持从 StarRocks 集群的多个 BE 并行读取数据,大大加快了读取任务。以下对比展示了两个 Connector 在实现上的差异。

  • StarRocks 的 Flink Connector

    使用 StarRocks 的 Flink Connector,Flink 可以首先从负责的 FE 获取查询计划,然后将获取的查询计划作为参数分发给所有涉及的 BE,最后获取 BE 返回的数据。

    - Flink connector of StarRocks

  • Flink 的 JDBC Connector

    使用 Flink 的 JDBC Connector,Flink 每次只能从单个 FE 读取数据。数据读取速度慢。

    JDBC connector of Flink

版本要求

连接器FlinkStarRocksJavaScala
1.2.101.15,1.16,1.17,1.18,1.192.1 及更高版本82.11,2.12
1.2.91.15,1.16,1.17,1.182.1 及更高版本82.11,2.12
1.2.81.13,1.14,1.15,1.16,1.172.1 及更高版本82.11,2.12
1.2.71.11,1.12,1.13,1.14,1.152.1 及更高版本82.11,2.12

前提条件

Flink 已部署。如果 Flink 尚未部署,请按照以下步骤进行部署

  1. 在您的操作系统中安装 Java 8 或 Java 11,以确保 Flink 可以正常运行。您可以使用以下命令检查 Java 安装的版本

    java -version

    例如,如果返回以下信息,则表示已安装 Java 8

    openjdk version "1.8.0_322"
    OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
    OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode)
  2. 下载并解压您选择的 Flink 包

    注意

    我们建议您使用 Flink v1.14 或更高版本。支持的最低 Flink 版本为 v1.11。

    # Download the Flink package.
    wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
    # Unzip the Flink package.
    tar -xzf flink-1.14.5-bin-scala_2.11.tgz
    # Go to the Flink directory.
    cd flink-1.14.5
  3. 启动您的 Flink 集群。

    # Start your Flink cluster.
    ./bin/start-cluster.sh

    # When the following information is displayed, your Flink cluster has successfully started:
    Starting cluster.
    Starting standalonesession daemon on host.
    Starting taskexecutor daemon on host.

您也可以按照 Flink 文档 中提供的说明部署 Flink。

准备工作

请按照以下步骤部署 Flink Connector

  1. 选择并下载与您使用的 Flink 版本匹配的 flink-connector-starrocks JAR 包。如果需要代码调试,请编译 Flink Connector 包以满足您的业务需求。

    注意

    我们建议您下载版本为 1.2.x 或更高版本,并且其匹配的 Flink 版本与您使用的 Flink 版本的前两位数字相同的 Flink Connector 包。例如,如果您使用 Flink v1.14.x,您可以下载 flink-connector-starrocks-1.2.4_flink-1.14_x.yy.jar

  2. 将您下载或编译的 Flink Connector 包放入 Flink 的 lib 目录中。

  3. 重启您的 Flink 集群。

网络配置

确保 Flink 所在的机器可以通过 http_port (默认值: 8030) 和 query_port (默认值: 9030) 访问 StarRocks 集群的 FE 节点,以及通过 be_port (默认值: 9060) 访问 BE 节点。

参数

通用参数

以下参数适用于 Flink SQL 和 Flink DataStream 两种读取方式。

参数必需数据类型描述
connectorSTRING您想要用来读取数据的 Connector 类型。将值设置为 starrocks
scan-urlSTRING用于从 Web 服务器连接 FE 的地址。格式:<fe_host>:<fe_http_port>。默认端口是 8030。您可以指定多个地址,地址之间必须用逗号 (,) 分隔。示例:192.168.xxx.xxx:8030,192.168.xxx.xxx:8030
jdbc-urlSTRING用于连接 FE 的 MySQL 客户端的地址。格式:jdbc:mysql://<fe_host>:<fe_query_port>。默认端口号为 9030
usernameSTRING您的 StarRocks 集群帐户的用户名。该帐户必须具有您要读取的 StarRocks 表的读取权限。请参阅用户权限
passwordSTRING您的 StarRocks 集群帐户的密码。
database-nameSTRING您要读取的 StarRocks 表所属的 StarRocks 数据库的名称。
table-nameSTRING您要读取的 StarRocks 表的名称。
scan.connect.timeout-msSTRINGFlink Connector 连接到您的 StarRocks 集群超时的最长时间。单位:毫秒。默认值:1000。如果建立连接所花费的时间超过此限制,则读取任务将失败。
scan.params.keep-alive-minSTRING读取任务保持活动状态的最长时间。保持活动时间通过轮询机制定期检查。单位:分钟。默认值:10。我们建议您将此参数设置为大于或等于 5 的值。
scan.params.query-timeout-sSTRING读取任务超时的最长时间。超时时间在任务执行期间检查。单位:秒。默认值:600。如果在经过该时间后没有返回读取结果,则读取任务将停止。
scan.params.mem-limit-byteSTRING每个查询在每个 BE 上允许的最大内存量。单位:字节。默认值:1073741824,等于 1 GB。
scan.max-retriesSTRING读取任务在失败后可以重试的最大次数。默认值:1。如果读取任务重试的次数超过此限制,则读取任务将返回错误。

以下参数仅适用于 Flink DataStream 读取方法。

参数必需数据类型描述
scan.columnsSTRING您想要读取的列。您可以指定多个列,这些列必须用逗号 (,) 分隔。
scan.filterSTRING您想要用来过滤数据的过滤条件。

假设您在 Flink 中创建了一个包含三列的表,分别为 c1c2c3。要读取此 Flink 表的 c1 列中的值等于 100 的行,您可以指定两个过滤条件 "scan.columns, "c1""scan.filter, "c1 = 100"

以下数据类型映射仅对 Flink 从 StarRocks 读取数据有效。有关用于 Flink 将数据写入 StarRocks 的数据类型映射,请参阅从 Apache Flink® 持续加载数据

StarRocksFlink
NULLNULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
LARGEINTSTRING
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
DECIMALV2DECIMAL
DECIMAL32DECIMAL
DECIMAL64DECIMAL
DECIMAL128DECIMAL
CHARCHAR
VARCHARSTRING
JSONSTRING
注意
1.2.10 版本起支持
ARRAYARRAY
注意
1.2.10 版本起支持,且需要 StarRocks v3.1.12/v3.2.5 或更高版本。
STRUCTROW
注意
1.2.10 版本起支持,且需要 StarRocks v3.1.12/v3.2.5 或更高版本。
MAPMAP
注意
1.2.10 版本起支持,且需要 StarRocks v3.1.12/v3.2.5 或更高版本。

示例

以下示例假设您已在 StarRocks 集群中创建了一个名为 test 的数据库,并且您具有用户 root 的权限。

注意

如果读取任务失败,您必须重新创建它。

数据示例

  1. 进入 test 数据库并创建一个名为 score_board 的表。

    MySQL [test]> CREATE TABLE `score_board`
    (
    `id` int(11) NOT NULL COMMENT "",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`)
    PROPERTIES
    (
    "replication_num" = "3"
    );
  2. 将数据插入到 score_board 表中。

    MySQL [test]> INSERT INTO score_board
    VALUES
    (1, 'Bob', 21),
    (2, 'Stan', 21),
    (3, 'Sam', 22),
    (4, 'Tony', 22),
    (5, 'Alice', 22),
    (6, 'Lucy', 23),
    (7, 'Polly', 23),
    (8, 'Tom', 23),
    (9, 'Rose', 24),
    (10, 'Jerry', 24),
    (11, 'Jason', 24),
    (12, 'Lily', 25),
    (13, 'Stephen', 25),
    (14, 'David', 25),
    (15, 'Eddie', 26),
    (16, 'Kate', 27),
    (17, 'Cathy', 27),
    (18, 'Judy', 27),
    (19, 'Julia', 28),
    (20, 'Robert', 28),
    (21, 'Jack', 29);
  3. 查询 score_board 表。

    MySQL [test]> SELECT * FROM score_board;
    +------+---------+-------+
    | id | name | score |
    +------+---------+-------+
    | 1 | Bob | 21 |
    | 2 | Stan | 21 |
    | 3 | Sam | 22 |
    | 4 | Tony | 22 |
    | 5 | Alice | 22 |
    | 6 | Lucy | 23 |
    | 7 | Polly | 23 |
    | 8 | Tom | 23 |
    | 9 | Rose | 24 |
    | 10 | Jerry | 24 |
    | 11 | Jason | 24 |
    | 12 | Lily | 25 |
    | 13 | Stephen | 25 |
    | 14 | David | 25 |
    | 15 | Eddie | 26 |
    | 16 | Kate | 27 |
    | 17 | Cathy | 27 |
    | 18 | Judy | 27 |
    | 19 | Julia | 28 |
    | 20 | Robert | 28 |
    | 21 | Jack | 29 |
    +------+---------+-------+
    21 rows in set (0.00 sec)
  1. 在您的 Flink 集群中,基于源 StarRocks 表(在本示例中为 score_board)的 schema 创建一个名为 flink_test 的表。在表创建命令中,您必须配置读取任务属性,包括有关 Flink Connector、源 StarRock 数据库和源 StarRocks 表的信息。

    CREATE TABLE flink_test
    (
    `id` INT,
    `name` STRING,
    `score` INT
    )
    WITH
    (
    'connector'='starrocks',
    'scan-url'='192.168.xxx.xxx:8030',
    'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030',
    'username'='xxxxxx',
    'password'='xxxxxx',
    'database-name'='test',
    'table-name'='score_board'
    );
  2. 使用 SELECT 从 StarRocks 读取数据。

    SELECT id, name FROM flink_test WHERE score > 20;

当您使用 Flink SQL 读取数据时,请注意以下几点

  • 您只能使用像 SELECT ... FROM <table_name> WHERE ... 这样的 SQL 语句从 StarRocks 读取数据。在所有聚合函数中,仅支持 count
  • 支持谓词下推。例如,如果您的查询包含一个过滤条件 char_1 <> 'A' and int_1 = -126,则该过滤条件将被下推到 Flink Connector 并转换为可以在查询运行之前由 StarRocks 执行的语句。您不需要执行额外的配置。
  • 不支持 LIMIT 语句。
  • StarRocks 不支持检查点机制。因此,如果读取任务失败,则无法保证数据一致性。
  1. 将以下依赖项添加到 pom.xml 文件

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for Apache Flink® 1.15 -->
    <version>x.x.x_flink-1.15</version>
    <!-- for Apache Flink® 1.14 -->
    <version>x.x.x_flink-1.14_2.11</version>
    <version>x.x.x_flink-1.14_2.12</version>
    <!-- for Apache Flink® 1.13 -->
    <version>x.x.x_flink-1.13_2.11</version>
    <version>x.x.x_flink-1.13_2.12</version>
    <!-- for Apache Flink® 1.12 -->
    <version>x.x.x_flink-1.12_2.11</version>
    <version>x.x.x_flink-1.12_2.12</version>
    <!-- for Apache Flink® 1.11 -->
    <version>x.x.x_flink-1.11_2.11</version>
    <version>x.x.x_flink-1.11_2.12</version>
    </dependency>

    您必须将上述代码示例中的 x.x.x 替换为您正在使用的最新 Flink Connector 版本。请参阅版本信息

  2. 调用 Flink Connector 从 StarRocks 读取数据

    import com.starrocks.connector.flink.StarRocksSource;
    import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.TableSchema;

    public class StarRocksSourceApp {
    public static void main(String[] args) throws Exception {
    StarRocksSourceOptions options = StarRocksSourceOptions.builder()
    .withProperty("scan-url", "192.168.xxx.xxx:8030")
    .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("table-name", "score_board")
    .withProperty("database-name", "test")
    .build();
    TableSchema tableSchema = TableSchema.builder()
    .field("id", DataTypes.INT())
    .field("name", DataTypes.STRING())
    .field("score", DataTypes.INT())
    .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
    env.execute("StarRocks flink source");
    }

    }

下一步

Flink 成功从 StarRocks 读取数据后,您可以使用 Flink WebUI 监控读取任务。例如,您可以在 WebUI 的 Metrics 页面上查看 totalScannedRows 指标,以获取成功读取的行数。您还可以使用 Flink SQL 对您已读取的数据执行诸如 join 之类的计算。