使用 Spark Connector 从 StarRocks 读取数据
StarRocks 提供了一个名为 StarRocks Connector for Apache Spark™ (简称 Spark Connector) 的自研连接器,帮助您使用 Spark 从 StarRocks 表中读取数据。 您可以使用 Spark 对从 StarRocks 读取的数据进行复杂的处理和机器学习。
Spark Connector 支持三种读取方法:Spark SQL、Spark DataFrame 和 Spark RDD。
您可以使用 Spark SQL 在 StarRocks 表上创建临时视图,然后使用该临时视图直接从 StarRocks 表中读取数据。
您还可以将 StarRocks 表映射到 Spark DataFrame 或 Spark RDD,然后从 Spark DataFrame 或 Spark RDD 中读取数据。 我们建议使用 Spark DataFrame。
注意
只有对 StarRocks 表具有 SELECT 权限的用户才能从此表中读取数据。 您可以按照 GRANT 中提供的说明授予用户权限。
使用说明
- 您可以在读取数据之前在 StarRocks 上过滤数据,从而减少传输的数据量。
- 如果读取数据的开销很大,您可以采用适当的表设计和过滤条件,以防止 Spark 一次读取过多的数据。 这样,您可以减少磁盘和网络连接上的 I/O 压力,从而确保可以正确运行例行查询。
版本要求
Spark Connector | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 及更高版本 | 8 | 2.12 |
1.1.1 | 3.2, 3.3, 3.4 | 2.5 及更高版本 | 8 | 2.12 |
1.1.0 | 3.2, 3.3, 3.4 | 2.5 及更高版本 | 8 | 2.12 |
1.0.0 | 3.x | 1.18 及更高版本 | 8 | 2.12 |
1.0.0 | 2.x | 1.18 及更高版本 | 8 | 2.11 |
注意
- 有关不同连接器版本的行为更改,请参阅升级 Spark Connector。
- 从 1.1.1 版本开始,连接器不再提供 MySQL JDBC 驱动程序,您需要手动将驱动程序导入 Spark 类路径。 您可以在 Maven Central 上找到该驱动程序。
- 在 1.0.0 版本中,Spark Connector 仅支持从 StarRocks 读取数据。 从 1.1.0 版本开始,Spark Connector 支持从 StarRocks 读取数据和向 StarRocks 写入数据。
- 1.0.0 版本在参数和数据类型映射方面与 1.1.0 版本不同。 请参阅升级 Spark Connector。
- 在一般情况下,不会向 1.0.0 版本添加新功能。 我们建议您尽快升级 Spark Connector。
获取 Spark Connector
使用以下方法之一来获取适合您业务需求的 Spark Connector .jar 包
- 下载已编译的包。
- 使用 Maven 添加 Spark Connector 所需的依赖项。 (此方法仅适用于 Spark Connector 1.1.0 及更高版本。)
- 手动编译一个包。
Spark Connector 1.1.0 及更高版本
Spark Connector .jar 包的命名格式如下
starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
例如,如果您想将 Spark Connector 1.1.0 与 Spark 3.2 和 Scala 2.12 一起使用,您可以选择 starrocks-spark-connector-3.2_2.12-1.1.0.jar
。
注意
在正常情况下,最新的 Spark Connector 版本可以与最新的三个 Spark 版本一起使用。
下载已编译的包
您可以在 Maven Central Repository 获取各种版本的 Spark Connector .jar 包。
添加 Maven 依赖项
按如下方式配置 Spark Connector 所需的依赖项
注意
您必须将
spark_version
、scala_version
和connector_version
替换为您使用的 Spark 版本、Scala 版本和 Spark Connector 版本。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>
例如,如果您想将 Spark Connector 1.1.0 与 Spark 3.2 和 Scala 2.12 一起使用,请按如下方式配置依赖项
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
手动编译一个包
-
使用以下命令编译 Spark Connector
注意
您必须将
spark_version
替换为您使用的 Spark 版本。sh build.sh <spark_version>
例如,如果您想将 Spark Connector 与 Spark 3.2 一起使用,请按如下方式编译 Spark Connector
sh build.sh 3.2
-
转到
target/
路径,编译后会生成一个类似于starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar
的 Spark Connector .jar 包。注意
如果您使用的 Spark Connector 版本未正式发布,则生成的 Spark Connector .jar 包的名称包含
SNAPSHOT
作为后缀。
Spark Connector 1.0.0
下载已编译的包
手动编译一个包
-
注意
您必须切换到
spark-1.0
。 -
执行以下操作之一来编译 Spark Connector
-
如果您使用的是 Spark 2.x,请运行以下命令,该命令默认编译 Spark Connector 以适应 Spark 2.3.4
sh build.sh 2
-
如果您使用的是 Spark 3.x,请运行以下命令,该命令默认编译 Spark Connector 以适应 Spark 3.1.2
sh build.sh 3
-
-
转到
output/
路径,编译后会生成starrocks-spark2_2.11-1.0.0.jar
文件。 然后,将该文件复制到 Spark 的类路径- 如果您的 Spark 集群在
Local
模式下运行,请将该文件放入jars/
路径。 - 如果您的 Spark 集群在
Yarn
模式下运行,请将该文件放入预部署包。
- 如果您的 Spark 集群在
只有将文件放入指定位置后,才能使用 Spark Connector 从 StarRocks 读取数据。
参数
本节介绍使用 Spark Connector 从 StarRocks 读取数据时需要配置的参数。
通用参数
以下参数适用于所有三种读取方法:Spark SQL、Spark DataFrame 和 Spark RDD。
参数 | 默认值 | 描述 |
---|---|---|
starrocks.fenodes | None (无) | StarRocks 集群中 FE 的 HTTP URL。 格式 <fe_host>:<fe_http_port> 。 您可以指定多个 URL,这些 URL 必须用逗号 (,) 分隔。 |
starrocks.table.identifier | None (无) | StarRocks 表的名称。 格式:<database_name>.<table_name> 。 |
starrocks.request.retries | 3 | Spark 可以重试向 StarRocks 发送读取请求的最大次数。 |
starrocks.request.connect.timeout.ms | 30000 | 发送到 StarRocks 的读取请求超时的最大时间量。 |
starrocks.request.read.timeout.ms | 30000 | 发送到 StarRocks 的请求的读取超时的最大时间量。 |
starrocks.request.query.timeout.s | 3600 | 从 StarRocks 查询数据的最大超时时间。 默认超时时间为 1 小时。 -1 表示未指定超时时间。 |
starrocks.request.tablet.size | Integer.MAX_VALUE | 分组到每个 Spark RDD 分区中的 StarRocks Tablet 的数量。 此参数的值越小,生成的 Spark RDD 分区就越多。 Spark RDD 分区越多,Spark 上的并行性越高,但 StarRocks 上的压力越大。 |
starrocks.batch.size | 4096 | 一次可以从 BE 读取的最大行数。 增加此参数的值可以减少 Spark 和 StarRocks 之间建立的连接数,从而减轻网络延迟造成的额外时间开销。 |
starrocks.exec.mem.limit | 2147483648 | 每个查询允许的最大内存量。 单位:字节。 默认内存限制为 2 GB。 |
starrocks.deserialize.arrow.async | false | 指定是否支持异步转换 Arrow 内存格式到 Spark Connector 迭代所需的 RowBatches。 |
starrocks.deserialize.queue.size | 64 | 保存用于异步转换 Arrow 内存格式到 RowBatches 的任务的内部队列的大小。 当 starrocks.deserialize.arrow.async 设置为 true 时,此参数有效。 |
starrocks.filter.query | None (无) | 您要在 StarRocks 上过滤数据所依据的条件。 您可以指定多个过滤条件,这些过滤条件必须由 and 连接。 StarRocks 会根据指定的过滤条件过滤 StarRocks 表中的数据,然后再由 Spark 读取数据。 |
starrocks.timezone | JVM 的默认时区 | 自 1.1.1 起支持。 用于将 StarRocks DATETIME 转换为 Spark TimestampType 的时区。 默认为 ZoneId#systemDefault() 返回的 JVM 时区。 格式可以是时区名称,例如 Asia/Shanghai ,也可以是区域偏移量,例如 +08:00 。 |
Spark SQL 和 Spark DataFrame 的参数
以下参数仅适用于 Spark SQL 和 Spark DataFrame 读取方法。
参数 | 默认值 | 描述 |
---|---|---|
starrocks.fe.http.url | None (无) | FE 的 HTTP IP 地址。 从 Spark Connector 1.1.0 起支持此参数。 此参数等效于 starrocks.fenodes 。 您只需要配置其中一个。 在 Spark Connector 1.1.0 及更高版本中,我们建议您使用 starrocks.fe.http.url ,因为 starrocks.fenodes 可能会被弃用。 |
starrocks.fe.jdbc.url | None (无) | 用于连接到 FE 的 MySQL 服务器的地址。 格式:jdbc:mysql://<fe_host>:<fe_query_port> 。注意 在 Spark Connector 1.1.0 及更高版本中,此参数是必需的。 |
user | None (无) | 您的 StarRocks 集群帐户的用户名。 该用户需要 StarRocks 表上的SELECT 权限。 |
starrocks.user | None (无) | 您的 StarRocks 集群帐户的用户名。 从 Spark Connector 1.1.0 起支持此参数。 此参数等效于 user 。 您只需要配置其中一个。 在 Spark Connector 1.1.0 及更高版本中,我们建议您使用 starrocks.user ,因为 user 可能会被弃用。 |
password | None (无) | 您的 StarRocks 集群帐户的密码。 |
starrocks.password | None (无) | 您的 StarRocks 集群帐户的密码。 从 Spark Connector 1.1.0 起支持此参数。 此参数等效于 password 。 您只需要配置其中一个。 在 Spark Connector 1.1.0 及更高版本中,我们建议您使用 starrocks.password ,因为 password 可能会被弃用。 |
starrocks.filter.query.in.max.count | 100 | 谓词下推期间 IN 表达式支持的最大值数。 如果 IN 表达式中指定的值数超过此限制,则 IN 表达式中指定的过滤条件将在 Spark 上处理。 |
Spark RDD 的参数
以下参数仅适用于 Spark RDD 读取方法。
参数 | 默认值 | 描述 |
---|---|---|
starrocks.request.auth.user | None (无) | 您的 StarRocks 集群帐户的用户名。 |
starrocks.request.auth.password | None (无) | 您的 StarRocks 集群帐户的密码。 |
starrocks.read.field | None (无) | 您要从中读取数据的 StarRocks 表列。 您可以指定多个列,这些列必须用逗号 (,) 分隔。 |
StarRocks 和 Spark 之间的数据类型映射
Spark Connector 1.1.0 及更高版本
StarRocks 数据类型 | Spark 数据类型 |
---|---|
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
LARGEINT | DataTypes.StringType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
STRING | DataTypes.StringType |
DATE | DataTypes.DateType |
DATETIME | DataTypes.TimestampType |
JSON | DataTypes.StringType 注意 此数据类型映射自 Spark Connector v1.1.2 起支持,并且需要至少 2.5.13、3.0.3、3.1.0 或更高版本的 StarRocks。 |
ARRAY | 不支持的数据类型 |
HLL | 不支持的数据类型 |
BITMAP | 不支持的数据类型 |
Spark Connector 1.0.0
StarRocks 数据类型 | Spark 数据类型 |
---|---|
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
LARGEINT | DataTypes.StringType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
DATE | DataTypes.StringType |
DATETIME | DataTypes.StringType |
ARRAY | 不支持的数据类型 |
HLL | 不支持的数据类型 |
BITMAP | 不支持的数据类型 |
当直接使用 DATE 和 DATETIME 数据类型时,StarRocks 使用的底层存储引擎的处理逻辑无法覆盖预期的时间范围。 因此,Spark Connector 将 StarRocks 中的 DATE 和 DATETIME 数据类型映射到 Spark 中的 STRING 数据类型,并生成与从 StarRocks 读取的日期和时间数据匹配的可读字符串文本。
升级 Spark Connector
从 1.0.0 版本升级到 1.1.0 版本
-
自 1.1.1 起,由于
mysql-connector-java
使用的 GPL 许可的限制,Spark Connector 不提供mysql-connector-java
,它是 MySQL 的官方 JDBC 驱动程序。 但是,Spark Connector 仍然需要mysql-connector-java
来连接到 StarRocks 以获取表元数据,因此您需要手动将驱动程序添加到 Spark 类路径。 您可以在 MySQL 站点 或 Maven Central 上找到该驱动程序。 -
在 1.1.0 版本中,Spark Connector 使用 JDBC 访问 StarRocks 以获取更详细的表信息。 因此,您必须配置
starrocks.fe.jdbc.url
。 -
在 1.1.0 版本中,某些参数已重命名。 旧参数和新参数现在都保留。 对于每对等效参数,您只需要配置其中一个,但我们建议您使用新参数,因为旧参数可能会被弃用。
starrocks.fenodes
重命名为starrocks.fe.http.url
。user
重命名为starrocks.user
。password
重命名为starrocks.password
。
-
在 1.1.0 版本中,某些数据类型的映射根据 Spark 3.x 进行了调整
- StarRocks 中的
DATE
映射到 Spark 中的DataTypes.DateType
(最初为DataTypes.StringType
)。 - StarRocks 中的
DATETIME
映射到 Spark 中的DataTypes.TimestampType
(最初为DataTypes.StringType
)。
- StarRocks 中的
示例
以下示例假设您已在 StarRocks 集群中创建了一个名为 test
的数据库,并且您具有用户 root
的权限。 示例中的参数设置基于 Spark Connector 1.1.0。
网络配置
确保 Spark 所在的机器可以通过 http_port
(默认: 8030
) 和 query_port
(默认: 9030
) 访问 StarRocks 集群的 FE 节点,并通过 be_port
(默认: 9060
) 访问 BE 节点。
数据示例
执行以下操作以准备示例表
-
转到
test
数据库并创建一个名为score_board
的表。MySQL [test]> CREATE TABLE `score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "3"
); -
将数据插入到
score_board
表中。MySQL [test]> INSERT INTO score_board
VALUES
(1, 'Bob', 21),
(2, 'Stan', 21),
(3, 'Sam', 22),
(4, 'Tony', 22),
(5, 'Alice', 22),
(6, 'Lucy', 23),
(7, 'Polly', 23),
(8, 'Tom', 23),
(9, 'Rose', 24),
(10, 'Jerry', 24),
(11, 'Jason', 24),
(12, 'Lily', 25),
(13, 'Stephen', 25),
(14, 'David', 25),
(15, 'Eddie', 26),
(16, 'Kate', 27),
(17, 'Cathy', 27),
(18, 'Judy', 27),
(19, 'Julia', 28),
(20, 'Robert', 28),
(21, 'Jack', 29); -
查询
score_board
表。MySQL [test]> SELECT * FROM score_board;
+------+---------+-------+
| id | name | score |
+------+---------+-------+
| 1 | Bob | 21 |
| 2 | Stan | 21 |
| 3 | Sam | 22 |
| 4 | Tony | 22 |
| 5 | Alice | 22 |
| 6 | Lucy | 23 |
| 7 | Polly | 23 |
| 8 | Tom | 23 |
| 9 | Rose | 24 |
| 10 | Jerry | 24 |
| 11 | Jason | 24 |
| 12 | Lily | 25 |
| 13 | Stephen | 25 |
| 14 | David | 25 |
| 15 | Eddie | 26 |
| 16 | Kate | 27 |
| 17 | Cathy | 27 |
| 18 | Judy | 27 |
| 19 | Julia | 28 |
| 20 | Robert | 28 |
| 21 | Jack | 29 |
+------+---------+-------+
21 rows in set (0.01 sec)
使用 Spark SQL 读取数据
-
在 Spark 目录中运行以下命令以启动 Spark SQL
sh spark-sql
-
运行以下命令以在属于
test
数据库的score_board
表上创建一个名为spark_starrocks
的临时视图spark-sql> CREATE TEMPORARY VIEW spark_starrocks
USING starrocks
OPTIONS
(
"starrocks.table.identifier" = "test.score_board",
"starrocks.fe.http.url" = "<fe_host>:<fe_http_port>",
"starrocks.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>",
"starrocks.user" = "root",
"starrocks.password" = ""
); -
运行以下命令以从临时视图读取数据
spark-sql> SELECT * FROM spark_starrocks;
Spark 返回以下数据
1 Bob 21
2 Stan 21
3 Sam 22
4 Tony 22
5 Alice 22
6 Lucy 23
7 Polly 23
8 Tom 23
9 Rose 24
10 Jerry 24
11 Jason 24
12 Lily 25
13 Stephen 25
14 David 25
15 Eddie 26
16 Kate 27
17 Cathy 27
18 Judy 27
19 Julia 28
20 Robert 28
21 Jack 29
Time taken: 1.883 seconds, Fetched 21 row(s)
22/08/09 15:29:36 INFO thriftserver.SparkSQLCLIDriver: Time taken: 1.883 seconds, Fetched 21 row(s)
使用 Spark DataFrame 读取数据
-
在 Spark 目录中运行以下命令以启动 Spark Shell
sh spark-shell
-
运行以下命令以在属于
test
数据库的score_board
表上创建一个名为starrocksSparkDF
的 DataFramescala> val starrocksSparkDF = spark.read.format("starrocks")
.option("starrocks.table.identifier", s"test.score_board")
.option("starrocks.fe.http.url", s"<fe_host>:<fe_http_port>")
.option("starrocks.fe.jdbc.url", s"jdbc:mysql://<fe_host>:<fe_query_port>")
.option("starrocks.user", s"root")
.option("starrocks.password", s"")
.load() -
从 DataFrame 读取数据。 例如,如果要读取前 10 行,请运行以下命令
scala> starrocksSparkDF.show(10)
Spark 返回以下数据
+---+-----+-----+
| id| name|score|
+---+-----+-----+
| 1| Bob| 21|
| 2| Stan| 21|
| 3| Sam| 22|
| 4| Tony| 22|
| 5|Alice| 22|
| 6| Lucy| 23|
| 7|Polly| 23|
| 8| Tom| 23|
| 9| Rose| 24|
| 10|Jerry| 24|
+---+-----+-----+
only showing top 10 rows注意
默认情况下,如果您不指定要读取的行数,Spark 将返回前 20 行。
使用 Spark RDD 读取数据
-
在 Spark 目录中运行以下命令以启动 Spark Shell
sh spark-shell
-
运行以下命令以在属于
test
数据库的score_board
表上创建一个名为starrocksSparkRDD
的 RDD。scala> import com.starrocks.connector.spark._
scala> val starrocksSparkRDD = sc.starrocksRDD
(
tableIdentifier = Some("test.score_board"),
cfg = Some(Map(
"starrocks.fenodes" -> "<fe_host>:<fe_http_port>",
"starrocks.request.auth.user" -> "root",
"starrocks.request.auth.password" -> ""
))
) -
从 RDD 读取数据。 例如,如果要读取前 10 个元素,请运行以下命令
scala> starrocksSparkRDD.take(10)
Spark 返回以下数据
res0: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24])
要读取整个 RDD,请运行以下命令
scala> starrocksSparkRDD.collect()
Spark 返回以下数据
res1: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24], [11, Jason, 24], [12, Lily, 25], [13, Stephen, 25], [14, David, 25], [15, Eddie, 26], [16, Kate, 27], [17, Cathy, 27], [18, Judy, 27], [19, Julia, 28], [20, Robert, 28], [21, Jack, 29])
最佳实践
当您使用 Spark Connector 从 StarRocks 读取数据时,可以使用 starrocks.filter.query
参数指定过滤条件,Spark 根据该过滤条件修剪分区、存储桶和前缀索引,以降低数据拉取的成本。 本节以 Spark DataFrame 为例,说明如何实现此目的。
环境设置
组件 | 版本 |
---|---|
Spark | Spark 2.4.4 和 Scala 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_302) |
StarRocks | 2.2.0 |
Spark Connector | starrocks-spark2_2.11-1.0.0.jar |
数据示例
执行以下操作以准备示例表
-
转到
test
数据库并创建一个名为mytable
的表。MySQL [test]> CREATE TABLE `mytable`
(
`k` int(11) NULL COMMENT "bucket",
`b` int(11) NULL COMMENT "",
`dt` datetime NULL COMMENT "",
`v` int(11) NULL COMMENT ""
)
ENGINE=OLAP
DUPLICATE KEY(`k`,`b`, `dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(
PARTITION p202201 VALUES [('2022-01-01 00:00:00'), ('2022-02-01 00:00:00')),
PARTITION p202202 VALUES [('2022-02-01 00:00:00'), ('2022-03-01 00:00:00')),
PARTITION p202203 VALUES [('2022-03-01 00:00:00'), ('2022-04-01 00:00:00'))
)
DISTRIBUTED BY HASH(`k`)
PROPERTIES (
"replication_num" = "3"
); -
将数据插入到
mytable
。MySQL [test]> INSERT INTO mytable
VALUES
(1, 11, '2022-01-02 08:00:00', 111),
(2, 22, '2022-02-02 08:00:00', 222),
(3, 33, '2022-03-02 08:00:00', 333); -
查询
mytable
表。MySQL [test]> select * from mytable;
+------+------+---------------------+------+
| k | b | dt | v |
+------+------+---------------------+------+
| 1 | 11 | 2022-01-02 08:00:00 | 111 |
| 2 | 22 | 2022-02-02 08:00:00 | 222 |
| 3 | 33 | 2022-03-02 08:00:00 | 333 |
+------+------+---------------------+------+
3 rows in set (0.01 sec)
全表扫描
-
在 Spark 目录中运行以下命令,以在属于
test
数据库的mytable
表上创建一个名为df
的 DataFramescala> val df = spark.read.format("starrocks")
.option("starrocks.table.identifier", s"test.mytable")
.option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
.option("user", s"root")
.option("password", s"")
.load() -
查看 StarRocks 集群的 FE 日志文件 fe.log,并找到执行的用于读取数据的 SQL 语句。 示例
2022-08-09 18:57:38,091 INFO (nioEventLoopGroup-3-10|196) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable`] from external service [ user ['root'@'%']] for database [test] table [mytable]
-
在
test
数据库中,使用 EXPLAIN 获取 SELECTk
,b
,dt
,v
fromtest
.mytable
语句的执行计划MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable`;
+-----------------------------------------------------------------------+
| Explain String |
+-----------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
| PARTITION: UNPARTITIONED |
| |
| RESULT SINK |
| |
| 1:EXCHANGE |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 01 |
| UNPARTITIONED |
| |
| 0:OlapScanNode |
| TABLE: mytable |
| PREAGGREGATION: ON |
| partitions=3/3 |
| rollup: mytable |
| tabletRatio=9/9 |
| tabletList=41297,41299,41301,41303,41305,41307,41309,41311,41313 |
| cardinality=3 |
| avgRowSize=4.0 |
| numNodes=0 |
+-----------------------------------------------------------------------+
26 rows in set (0.00 sec)
在此示例中,未执行任何修剪。 因此,Spark 扫描保存数据的三个分区中的所有分区(如 partitions=3/3
所示),并扫描这三个分区中的所有 9 个 Tablet(如 tabletRatio=9/9
所示)。
分区修剪
-
运行以下命令,其中您使用
starrocks.filter.query
参数指定过滤条件dt='2022-01-02 08:00:00
用于分区修剪,在 Spark 目录中以在属于test
数据库的mytable
表上创建一个名为df
的 DataFramescala> val df = spark.read.format("starrocks")
.option("starrocks.table.identifier", s"test.mytable")
.option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
.option("user", s"root")
.option("password", s"")
.option("starrocks.filter.query", "dt='2022-01-02 08:00:00'")
.load() -
查看 StarRocks 集群的 FE 日志文件 fe.log,并找到执行的用于读取数据的 SQL 语句。 示例
2022-08-09 19:02:31,253 INFO (nioEventLoopGroup-3-14|204) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] table [mytable]
-
在
test
数据库中,使用 EXPLAIN 获取 SELECTk
,b
,dt
,v
fromtest
.mytable
where dt='2022-01-02 08:00:00' 语句的执行计划MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00';
+------------------------------------------------+
| Explain String |
+------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
| PARTITION: UNPARTITIONED |
| |
| RESULT SINK |
| |
| 1:EXCHANGE |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 01 |
| UNPARTITIONED |
| |
| 0:OlapScanNode |
| TABLE: mytable |
| PREAGGREGATION: ON |
| PREDICATES: 3: dt = '2022-01-02 08:00:00' |
| partitions=1/3 |
| rollup: mytable |
| tabletRatio=3/3 |
| tabletList=41297,41299,41301 |
| cardinality=1 |
| avgRowSize=20.0 |
| numNodes=0 |
+------------------------------------------------+
27 rows in set (0.01 sec)
在此示例中,仅执行分区修剪,而未执行存储桶修剪。 因此,Spark 扫描三个分区中的一个分区(如 partitions=1/3
所示)以及该分区中的所有 Tablet(如 tabletRatio=3/3
所示)。
存储桶修剪
-
运行以下命令,其中您使用
starrocks.filter.query
参数指定过滤条件k=1
用于存储桶修剪,在 Spark 目录中以在属于test
数据库的mytable
表上创建一个名为df
的 DataFramescala> val df = spark.read.format("starrocks")
.option("starrocks.table.identifier", s"test.mytable")
.option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
.option("user", s"root")
.option("password", s"")
.option("starrocks.filter.query", "k=1")
.load() -
查看 StarRocks 集群的 FE 日志文件 fe.log,并找到执行的用于读取数据的 SQL 语句。 示例
2022-08-09 19:04:44,479 INFO (nioEventLoopGroup-3-16|208) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1] from external service [ user ['root'@'%']] for database [test] table [mytable]
-
在
test
数据库中,使用 EXPLAIN 获取 SELECTk
,b
,dt
,v
fromtest
.mytable
where k=1 语句的执行计划MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1;
+------------------------------------------+
| Explain String |
+------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
| PARTITION: UNPARTITIONED |
| |
| RESULT SINK |
| |
| 1:EXCHANGE |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 01 |
| UNPARTITIONED |
| |
| 0:OlapScanNode |
| TABLE: mytable |
| PREAGGREGATION: ON |
| PREDICATES: 1: k = 1 |
| partitions=3/3 |
| rollup: mytable |
| tabletRatio=3/9 |
| tabletList=41299,41305,41311 |
| cardinality=1 |
| avgRowSize=20.0 |
| numNodes=0 |
+------------------------------------------+
27 rows in set (0.01 sec)
在此示例中,仅执行存储桶修剪,而未执行分区修剪。 因此,Spark 扫描保存数据的三个分区中的所有分区(如 partitions=3/3
所示),并扫描这三个分区中的所有三个 Tablet(如 tabletRatio=3/9
所示),以检索满足 k = 1
过滤条件的哈希值。
分区修剪和存储桶修剪
-
运行以下命令,其中您使用
starrocks.filter.query
参数指定两个过滤条件k=7
和dt='2022-01-02 08:00:00'
用于存储桶修剪和分区修剪,在 Spark 目录中以在test
数据库的mytable
表上创建一个名为df
的 DataFramescala> val df = spark.read.format("starrocks")
.option("starrocks.table.identifier", s"test.mytable")
.option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
.option("user", s"")
.option("password", s"")
.option("starrocks.filter.query", "k=7 and dt='2022-01-02 08:00:00'")
.load() -
查看 StarRocks 集群的 FE 日志文件 fe.log,并找到执行的用于读取数据的 SQL 语句。 示例
2022-08-09 19:06:34,939 INFO (nioEventLoopGroup-3-18|212) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] t
able [mytable] -
在
test
数据库中,使用 EXPLAIN 获取 SELECTk
,b
,dt
,v
fromtest
.mytable
where k=7 and dt='2022-01-02 08:00:00' 语句的执行计划MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00';
+----------------------------------------------------------+
| Explain String |
+----------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 0:OlapScanNode |
| TABLE: mytable |
| PREAGGREGATION: ON |
| PREDICATES: 1: k = 7, 3: dt = '2022-01-02 08:00:00' |
| partitions=1/3 |
| rollup: mytable |
| tabletRatio=1/3 |
| tabletList=41301 |
| cardinality=1 |
| avgRowSize=20.0 |
| numNodes=0 |
+----------------------------------------------------------+
17 rows in set (0.00 sec)
在此示例中,执行分区修剪和存储桶修剪。 因此,Spark 仅扫描三个分区中的一个分区(如 partitions=1/3
所示)以及该分区中的一个 Tablet(如 tabletRatio=1/3
所示)。
前缀索引过滤
-
将更多数据记录插入到属于
test
数据库的mytable
表的分区中MySQL [test]> INSERT INTO mytable
VALUES
(1, 11, "2022-01-02 08:00:00", 111),
(3, 33, "2022-01-02 08:00:00", 333),
(3, 33, "2022-01-02 08:00:00", 333),
(3, 33, "2022-01-02 08:00:00", 333); -
查询
mytable
表MySQL [test]> SELECT * FROM mytable;
+------+------+---------------------+------+
| k | b | dt | v |
+------+------+---------------------+------+
| 1 | 11 | 2022-01-02 08:00:00 | 111 |
| 3 | 33 | 2022-01-02 08:00:00 | 333 |
| 3 | 33 | 2022-01-02 08:00:00 | 333 |
| 3 | 33 | 2022-01-02 08:00:00 | 333 |
+------+------+---------------------+------+
4 rows in set (0.01 sec) -
运行以下命令,其中您使用
starrocks.filter.query
参数指定过滤条件k=1
用于前缀索引过滤,在 Spark 目录中以在属于test
数据库的mytable
表上创建一个名为df
的 DataFramescala> val df = spark.read.format("starrocks")
.option("starrocks.table.identifier", s"test.mytable")
.option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
.option("user", s"root")
.option("password", s"")
.option("starrocks.filter.query", "k=1")
.load() -
在
test
数据库中,设置is_report_success
为true
以启用配置文件报告MySQL [test]> SET is_report_success = true;
Query OK, 0 rows affected (0.00 sec) -
使用浏览器打开
http://<fe_host>:<http_http_port>/query
页面,并查看 SELECT * FROM mytable where k=1 语句的配置文件。 示例OLAP_SCAN (plan_node_id=0):
CommonMetrics:
- CloseTime: 1.255ms
- OperatorTotalTime: 1.404ms
- PeakMemoryUsage: 0.00
- PullChunkNum: 8
- PullRowNum: 2
- __MAX_OF_PullRowNum: 2
- __MIN_OF_PullRowNum: 0
- PullTotalTime: 148.60us
- PushChunkNum: 0
- PushRowNum: 0
- PushTotalTime: 0ns
- SetFinishedTime: 136ns
- SetFinishingTime: 129ns
UniqueMetrics:
- Predicates: 1: k = 1
- Rollup: mytable
- Table: mytable
- BytesRead: 88.00 B
- __MAX_OF_BytesRead: 88.00 B
- __MIN_OF_BytesRead: 0.00
- CachedPagesNum: 0
- CompressedBytesRead: 844.00 B
- __MAX_OF_CompressedBytesRead: 844.00 B
- __MIN_OF_CompressedBytesRead: 0.00
- CreateSegmentIter: 18.582us
- IOTime: 4.425us
- LateMaterialize: 17.385us
- PushdownPredicates: 3
- RawRowsRead: 2
- __MAX_OF_RawRowsRead: 2
- __MIN_OF_RawRowsRead: 0
- ReadPagesNum: 12
- __MAX_OF_ReadPagesNum: 12
- __MIN_OF_ReadPagesNum: 0
- RowsRead: 2
- __MAX_OF_RowsRead: 2
- __MIN_OF_RowsRead: 0
- ScanTime: 154.367us
- SegmentInit: 95.903us
- BitmapIndexFilter: 0ns
- BitmapIndexFilterRows: 0
- BloomFilterFilterRows: 0
- ShortKeyFilterRows: 3
- __MAX_OF_ShortKeyFilterRows: 3
- __MIN_OF_ShortKeyFilterRows: 0
- ZoneMapIndexFilterRows: 0
- SegmentRead: 2.559us
- BlockFetch: 2.187us
- BlockFetchCount: 2
- __MAX_OF_BlockFetchCount: 2
- __MIN_OF_BlockFetchCount: 0
- BlockSeek: 7.789us
- BlockSeekCount: 2
- __MAX_OF_BlockSeekCount: 2
- __MIN_OF_BlockSeekCount: 0
- ChunkCopy: 25ns
- DecompressT: 0ns
- DelVecFilterRows: 0
- IndexLoad: 0ns
- PredFilter: 353ns
- PredFilterRows: 0
- RowsetsReadCount: 7
- SegmentsReadCount: 3
- __MAX_OF_SegmentsReadCount: 2
- __MIN_OF_SegmentsReadCount: 0
- TotalColumnsDataPageCount: 8
- __MAX_OF_TotalColumnsDataPageCount: 8
- __MIN_OF_TotalColumnsDataPageCount: 0
- UncompressedBytesRead: 508.00 B
- __MAX_OF_UncompressedBytesRead: 508.00 B
- __MIN_OF_UncompressedBytesRead: 0.00
在此示例中,过滤条件 k = 1
可以命中前缀索引。 因此,Spark 可以过滤掉三行(如 ShortKeyFilterRows: 3
所示)。