使用 Flink Connector 从 StarRocks 读取数据
StarRocks 提供了一个自研的 Connector,名为 StarRocks Connector for Apache Flink® (简称 Flink Connector),它可以帮助您使用 Flink 从 StarRocks 集群批量读取数据。
Flink Connector 支持两种读取方式:Flink SQL 和 Flink DataStream。推荐使用 Flink SQL。
注意
Flink Connector 也支持将 Flink 读取的数据写入另一个 StarRocks 集群或存储系统。请参阅从 Apache Flink® 持续加载数据。
背景信息
与 Flink 提供的 JDBC Connector 不同,StarRocks 的 Flink Connector 支持从 StarRocks 集群的多个 BE 并行读取数据,大大加快了读取任务。以下对比展示了两个 Connector 在实现上的差异。
-
StarRocks 的 Flink Connector
使用 StarRocks 的 Flink Connector,Flink 可以首先从负责的 FE 获取查询计划,然后将获取的查询计划作为参数分发给所有涉及的 BE,最后获取 BE 返回的数据。
-
Flink 的 JDBC Connector
使用 Flink 的 JDBC Connector,Flink 每次只能从单个 FE 读取数据。数据读取速度慢。
版本要求
连接器 | Flink | StarRocks | Java | Scala |
---|---|---|---|---|
1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及更高版本 | 8 | 2.11,2.12 |
1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 及更高版本 | 8 | 2.11,2.12 |
1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 及更高版本 | 8 | 2.11,2.12 |
1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 及更高版本 | 8 | 2.11,2.12 |
前提条件
Flink 已部署。如果 Flink 尚未部署,请按照以下步骤进行部署
-
在您的操作系统中安装 Java 8 或 Java 11,以确保 Flink 可以正常运行。您可以使用以下命令检查 Java 安装的版本
java -version
例如,如果返回以下信息,则表示已安装 Java 8
openjdk version "1.8.0_322"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode) -
下载并解压您选择的 Flink 包。
注意
我们建议您使用 Flink v1.14 或更高版本。支持的最低 Flink 版本为 v1.11。
# Download the Flink package.
wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
# Unzip the Flink package.
tar -xzf flink-1.14.5-bin-scala_2.11.tgz
# Go to the Flink directory.
cd flink-1.14.5 -
启动您的 Flink 集群。
# Start your Flink cluster.
./bin/start-cluster.sh
# When the following information is displayed, your Flink cluster has successfully started:
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
您也可以按照 Flink 文档 中提供的说明部署 Flink。
准备工作
部署 Flink Connector
请按照以下步骤部署 Flink Connector
-
选择并下载与您使用的 Flink 版本匹配的 flink-connector-starrocks JAR 包。如果需要代码调试,请编译 Flink Connector 包以满足您的业务需求。
注意
我们建议您下载版本为 1.2.x 或更高版本,并且其匹配的 Flink 版本与您使用的 Flink 版本的前两位数字相同的 Flink Connector 包。例如,如果您使用 Flink v1.14.x,您可以下载
flink-connector-starrocks-1.2.4_flink-1.14_x.yy.jar
。 -
将您下载或编译的 Flink Connector 包放入 Flink 的
lib
目录中。 -
重启您的 Flink 集群。
网络配置
确保 Flink 所在的机器可以通过 http_port
(默认值: 8030
) 和 query_port
(默认值: 9030
) 访问 StarRocks 集群的 FE 节点,以及通过 be_port
(默认值: 9060
) 访问 BE 节点。
参数
通用参数
以下参数适用于 Flink SQL 和 Flink DataStream 两种读取方式。
参数 | 必需 | 数据类型 | 描述 |
---|---|---|---|
connector | 是 | STRING | 您想要用来读取数据的 Connector 类型。将值设置为 starrocks 。 |
scan-url | 是 | STRING | 用于从 Web 服务器连接 FE 的地址。格式:<fe_host>:<fe_http_port> 。默认端口是 8030 。您可以指定多个地址,地址之间必须用逗号 (,) 分隔。示例:192.168.xxx.xxx:8030,192.168.xxx.xxx:8030 。 |
jdbc-url | 是 | STRING | 用于连接 FE 的 MySQL 客户端的地址。格式:jdbc:mysql://<fe_host>:<fe_query_port> 。默认端口号为 9030 。 |
username | 是 | STRING | 您的 StarRocks 集群帐户的用户名。该帐户必须具有您要读取的 StarRocks 表的读取权限。请参阅用户权限。 |
password | 是 | STRING | 您的 StarRocks 集群帐户的密码。 |
database-name | 是 | STRING | 您要读取的 StarRocks 表所属的 StarRocks 数据库的名称。 |
table-name | 是 | STRING | 您要读取的 StarRocks 表的名称。 |
scan.connect.timeout-ms | 否 | STRING | Flink Connector 连接到您的 StarRocks 集群超时的最长时间。单位:毫秒。默认值:1000 。如果建立连接所花费的时间超过此限制,则读取任务将失败。 |
scan.params.keep-alive-min | 否 | STRING | 读取任务保持活动状态的最长时间。保持活动时间通过轮询机制定期检查。单位:分钟。默认值:10 。我们建议您将此参数设置为大于或等于 5 的值。 |
scan.params.query-timeout-s | 否 | STRING | 读取任务超时的最长时间。超时时间在任务执行期间检查。单位:秒。默认值:600 。如果在经过该时间后没有返回读取结果,则读取任务将停止。 |
scan.params.mem-limit-byte | 否 | STRING | 每个查询在每个 BE 上允许的最大内存量。单位:字节。默认值:1073741824 ,等于 1 GB。 |
scan.max-retries | 否 | STRING | 读取任务在失败后可以重试的最大次数。默认值:1 。如果读取任务重试的次数超过此限制,则读取任务将返回错误。 |
Flink DataStream 的参数
以下参数仅适用于 Flink DataStream 读取方法。
参数 | 必需 | 数据类型 | 描述 |
---|---|---|---|
scan.columns | 否 | STRING | 您想要读取的列。您可以指定多个列,这些列必须用逗号 (,) 分隔。 |
scan.filter | 否 | STRING | 您想要用来过滤数据的过滤条件。 |
假设您在 Flink 中创建了一个包含三列的表,分别为 c1
、c2
、c3
。要读取此 Flink 表的 c1
列中的值等于 100
的行,您可以指定两个过滤条件 "scan.columns, "c1"
和 "scan.filter, "c1 = 100"
。
StarRocks 和 Flink 之间的数据类型映射
以下数据类型映射仅对 Flink 从 StarRocks 读取数据有效。有关用于 Flink 将数据写入 StarRocks 的数据类型映射,请参阅从 Apache Flink® 持续加载数据。
StarRocks | Flink |
---|---|
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
JSON | STRING 注意 1.2.10 版本起支持 |
ARRAY | ARRAY 注意 1.2.10 版本起支持,且需要 StarRocks v3.1.12/v3.2.5 或更高版本。 |
STRUCT | ROW 注意 1.2.10 版本起支持,且需要 StarRocks v3.1.12/v3.2.5 或更高版本。 |
MAP | MAP 注意 1.2.10 版本起支持,且需要 StarRocks v3.1.12/v3.2.5 或更高版本。 |
示例
以下示例假设您已在 StarRocks 集群中创建了一个名为 test
的数据库,并且您具有用户 root
的权限。
注意
如果读取任务失败,您必须重新创建它。
数据示例
-
进入
test
数据库并创建一个名为score_board
的表。MySQL [test]> CREATE TABLE `score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`)
PROPERTIES
(
"replication_num" = "3"
); -
将数据插入到
score_board
表中。MySQL [test]> INSERT INTO score_board
VALUES
(1, 'Bob', 21),
(2, 'Stan', 21),
(3, 'Sam', 22),
(4, 'Tony', 22),
(5, 'Alice', 22),
(6, 'Lucy', 23),
(7, 'Polly', 23),
(8, 'Tom', 23),
(9, 'Rose', 24),
(10, 'Jerry', 24),
(11, 'Jason', 24),
(12, 'Lily', 25),
(13, 'Stephen', 25),
(14, 'David', 25),
(15, 'Eddie', 26),
(16, 'Kate', 27),
(17, 'Cathy', 27),
(18, 'Judy', 27),
(19, 'Julia', 28),
(20, 'Robert', 28),
(21, 'Jack', 29); -
查询
score_board
表。MySQL [test]> SELECT * FROM score_board;
+------+---------+-------+
| id | name | score |
+------+---------+-------+
| 1 | Bob | 21 |
| 2 | Stan | 21 |
| 3 | Sam | 22 |
| 4 | Tony | 22 |
| 5 | Alice | 22 |
| 6 | Lucy | 23 |
| 7 | Polly | 23 |
| 8 | Tom | 23 |
| 9 | Rose | 24 |
| 10 | Jerry | 24 |
| 11 | Jason | 24 |
| 12 | Lily | 25 |
| 13 | Stephen | 25 |
| 14 | David | 25 |
| 15 | Eddie | 26 |
| 16 | Kate | 27 |
| 17 | Cathy | 27 |
| 18 | Judy | 27 |
| 19 | Julia | 28 |
| 20 | Robert | 28 |
| 21 | Jack | 29 |
+------+---------+-------+
21 rows in set (0.00 sec)
使用 Flink SQL 读取数据
-
在您的 Flink 集群中,基于源 StarRocks 表(在本示例中为
score_board
)的 schema 创建一个名为flink_test
的表。在表创建命令中,您必须配置读取任务属性,包括有关 Flink Connector、源 StarRock 数据库和源 StarRocks 表的信息。CREATE TABLE flink_test
(
`id` INT,
`name` STRING,
`score` INT
)
WITH
(
'connector'='starrocks',
'scan-url'='192.168.xxx.xxx:8030',
'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030',
'username'='xxxxxx',
'password'='xxxxxx',
'database-name'='test',
'table-name'='score_board'
); -
使用 SELECT 从 StarRocks 读取数据。
SELECT id, name FROM flink_test WHERE score > 20;
当您使用 Flink SQL 读取数据时,请注意以下几点
- 您只能使用像
SELECT ... FROM <table_name> WHERE ...
这样的 SQL 语句从 StarRocks 读取数据。在所有聚合函数中,仅支持count
。 - 支持谓词下推。例如,如果您的查询包含一个过滤条件
char_1 <> 'A' and int_1 = -126
,则该过滤条件将被下推到 Flink Connector 并转换为可以在查询运行之前由 StarRocks 执行的语句。您不需要执行额外的配置。 - 不支持 LIMIT 语句。
- StarRocks 不支持检查点机制。因此,如果读取任务失败,则无法保证数据一致性。
使用 Flink DataStream 读取数据
-
将以下依赖项添加到
pom.xml
文件<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- for Apache Flink® 1.15 -->
<version>x.x.x_flink-1.15</version>
<!-- for Apache Flink® 1.14 -->
<version>x.x.x_flink-1.14_2.11</version>
<version>x.x.x_flink-1.14_2.12</version>
<!-- for Apache Flink® 1.13 -->
<version>x.x.x_flink-1.13_2.11</version>
<version>x.x.x_flink-1.13_2.12</version>
<!-- for Apache Flink® 1.12 -->
<version>x.x.x_flink-1.12_2.11</version>
<version>x.x.x_flink-1.12_2.12</version>
<!-- for Apache Flink® 1.11 -->
<version>x.x.x_flink-1.11_2.11</version>
<version>x.x.x_flink-1.11_2.12</version>
</dependency>您必须将上述代码示例中的
x.x.x
替换为您正在使用的最新 Flink Connector 版本。请参阅版本信息。 -
调用 Flink Connector 从 StarRocks 读取数据
import com.starrocks.connector.flink.StarRocksSource;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
public class StarRocksSourceApp {
public static void main(String[] args) throws Exception {
StarRocksSourceOptions options = StarRocksSourceOptions.builder()
.withProperty("scan-url", "192.168.xxx.xxx:8030")
.withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("table-name", "score_board")
.withProperty("database-name", "test")
.build();
TableSchema tableSchema = TableSchema.builder()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
.field("score", DataTypes.INT())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
env.execute("StarRocks flink source");
}
}
下一步
Flink 成功从 StarRocks 读取数据后,您可以使用 Flink WebUI 监控读取任务。例如,您可以在 WebUI 的 Metrics 页面上查看 totalScannedRows
指标,以获取成功读取的行数。您还可以使用 Flink SQL 对您已读取的数据执行诸如 join 之类的计算。