跳到主要内容
版本: 最新版本-3.5

使用 Spark Connector 从 StarRocks 读取数据

StarRocks 提供了一个名为 StarRocks Connector for Apache Spark™ (简称 Spark Connector) 的自研连接器,帮助您使用 Spark 从 StarRocks 表中读取数据。 您可以使用 Spark 对从 StarRocks 读取的数据进行复杂的处理和机器学习。

Spark Connector 支持三种读取方法:Spark SQL、Spark DataFrame 和 Spark RDD。

您可以使用 Spark SQL 在 StarRocks 表上创建临时视图,然后使用该临时视图直接从 StarRocks 表中读取数据。

您还可以将 StarRocks 表映射到 Spark DataFrame 或 Spark RDD,然后从 Spark DataFrame 或 Spark RDD 中读取数据。 我们建议使用 Spark DataFrame。

注意

只有对 StarRocks 表具有 SELECT 权限的用户才能从此表中读取数据。 您可以按照 GRANT 中提供的说明授予用户权限。

使用说明

  • 您可以在读取数据之前在 StarRocks 上过滤数据,从而减少传输的数据量。
  • 如果读取数据的开销很大,您可以采用适当的表设计和过滤条件,以防止 Spark 一次读取过多的数据。 这样,您可以减少磁盘和网络连接上的 I/O 压力,从而确保可以正确运行例行查询。

版本要求

Spark ConnectorSparkStarRocksJavaScala
1.1.23.2, 3.3, 3.4, 3.52.5 及更高版本82.12
1.1.13.2, 3.3, 3.42.5 及更高版本82.12
1.1.03.2, 3.3, 3.42.5 及更高版本82.12
1.0.03.x1.18 及更高版本82.12
1.0.02.x1.18 及更高版本82.11

注意

  • 有关不同连接器版本的行为更改,请参阅升级 Spark Connector
  • 从 1.1.1 版本开始,连接器不再提供 MySQL JDBC 驱动程序,您需要手动将驱动程序导入 Spark 类路径。 您可以在 Maven Central 上找到该驱动程序。
  • 在 1.0.0 版本中,Spark Connector 仅支持从 StarRocks 读取数据。 从 1.1.0 版本开始,Spark Connector 支持从 StarRocks 读取数据和向 StarRocks 写入数据。
  • 1.0.0 版本在参数和数据类型映射方面与 1.1.0 版本不同。 请参阅升级 Spark Connector
  • 在一般情况下,不会向 1.0.0 版本添加新功能。 我们建议您尽快升级 Spark Connector。

获取 Spark Connector

使用以下方法之一来获取适合您业务需求的 Spark Connector .jar

  • 下载已编译的包。
  • 使用 Maven 添加 Spark Connector 所需的依赖项。 (此方法仅适用于 Spark Connector 1.1.0 及更高版本。)
  • 手动编译一个包。

Spark Connector 1.1.0 及更高版本

Spark Connector .jar 包的命名格式如下

starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar

例如,如果您想将 Spark Connector 1.1.0 与 Spark 3.2 和 Scala 2.12 一起使用,您可以选择 starrocks-spark-connector-3.2_2.12-1.1.0.jar

注意

在正常情况下,最新的 Spark Connector 版本可以与最新的三个 Spark 版本一起使用。

下载已编译的包

您可以在 Maven Central Repository 获取各种版本的 Spark Connector .jar 包。

添加 Maven 依赖项

按如下方式配置 Spark Connector 所需的依赖项

注意

您必须将 spark_versionscala_versionconnector_version 替换为您使用的 Spark 版本、Scala 版本和 Spark Connector 版本。

<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>

例如,如果您想将 Spark Connector 1.1.0 与 Spark 3.2 和 Scala 2.12 一起使用,请按如下方式配置依赖项

<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>

手动编译一个包

  1. 下载 Spark Connector 代码

  2. 使用以下命令编译 Spark Connector

    注意

    您必须将 spark_version 替换为您使用的 Spark 版本。

    sh build.sh <spark_version>

    例如,如果您想将 Spark Connector 与 Spark 3.2 一起使用,请按如下方式编译 Spark Connector

    sh build.sh 3.2
  3. 转到 target/ 路径,编译后会生成一个类似于 starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar 的 Spark Connector .jar 包。

    注意

    如果您使用的 Spark Connector 版本未正式发布,则生成的 Spark Connector .jar 包的名称包含 SNAPSHOT 作为后缀。

Spark Connector 1.0.0

下载已编译的包

手动编译一个包

  1. 下载 Spark Connector 代码

    注意

    您必须切换到 spark-1.0

  2. 执行以下操作之一来编译 Spark Connector

    • 如果您使用的是 Spark 2.x,请运行以下命令,该命令默认编译 Spark Connector 以适应 Spark 2.3.4

      sh build.sh 2
    • 如果您使用的是 Spark 3.x,请运行以下命令,该命令默认编译 Spark Connector 以适应 Spark 3.1.2

      sh build.sh 3
  3. 转到 output/ 路径,编译后会生成 starrocks-spark2_2.11-1.0.0.jar 文件。 然后,将该文件复制到 Spark 的类路径

    • 如果您的 Spark 集群在 Local 模式下运行,请将该文件放入 jars/ 路径。
    • 如果您的 Spark 集群在 Yarn 模式下运行,请将该文件放入预部署包。

只有将文件放入指定位置后,才能使用 Spark Connector 从 StarRocks 读取数据。

参数

本节介绍使用 Spark Connector 从 StarRocks 读取数据时需要配置的参数。

通用参数

以下参数适用于所有三种读取方法:Spark SQL、Spark DataFrame 和 Spark RDD。

参数默认值描述
starrocks.fenodesNone (无)StarRocks 集群中 FE 的 HTTP URL。 格式 <fe_host>:<fe_http_port>。 您可以指定多个 URL,这些 URL 必须用逗号 (,) 分隔。
starrocks.table.identifierNone (无)StarRocks 表的名称。 格式:<database_name>.<table_name>
starrocks.request.retries3Spark 可以重试向 StarRocks 发送读取请求的最大次数。
starrocks.request.connect.timeout.ms30000发送到 StarRocks 的读取请求超时的最大时间量。
starrocks.request.read.timeout.ms30000发送到 StarRocks 的请求的读取超时的最大时间量。
starrocks.request.query.timeout.s3600从 StarRocks 查询数据的最大超时时间。 默认超时时间为 1 小时。 -1 表示未指定超时时间。
starrocks.request.tablet.sizeInteger.MAX_VALUE分组到每个 Spark RDD 分区中的 StarRocks Tablet 的数量。 此参数的值越小,生成的 Spark RDD 分区就越多。 Spark RDD 分区越多,Spark 上的并行性越高,但 StarRocks 上的压力越大。
starrocks.batch.size4096一次可以从 BE 读取的最大行数。 增加此参数的值可以减少 Spark 和 StarRocks 之间建立的连接数,从而减轻网络延迟造成的额外时间开销。
starrocks.exec.mem.limit2147483648每个查询允许的最大内存量。 单位:字节。 默认内存限制为 2 GB。
starrocks.deserialize.arrow.asyncfalse指定是否支持异步转换 Arrow 内存格式到 Spark Connector 迭代所需的 RowBatches。
starrocks.deserialize.queue.size64保存用于异步转换 Arrow 内存格式到 RowBatches 的任务的内部队列的大小。 当 starrocks.deserialize.arrow.async 设置为 true 时,此参数有效。
starrocks.filter.queryNone (无)您要在 StarRocks 上过滤数据所依据的条件。 您可以指定多个过滤条件,这些过滤条件必须由 and 连接。 StarRocks 会根据指定的过滤条件过滤 StarRocks 表中的数据,然后再由 Spark 读取数据。
starrocks.timezoneJVM 的默认时区自 1.1.1 起支持。 用于将 StarRocks DATETIME 转换为 Spark TimestampType 的时区。 默认为 ZoneId#systemDefault() 返回的 JVM 时区。 格式可以是时区名称,例如 Asia/Shanghai,也可以是区域偏移量,例如 +08:00

Spark SQL 和 Spark DataFrame 的参数

以下参数仅适用于 Spark SQL 和 Spark DataFrame 读取方法。

参数默认值描述
starrocks.fe.http.urlNone (无)FE 的 HTTP IP 地址。 从 Spark Connector 1.1.0 起支持此参数。 此参数等效于 starrocks.fenodes。 您只需要配置其中一个。 在 Spark Connector 1.1.0 及更高版本中,我们建议您使用 starrocks.fe.http.url,因为 starrocks.fenodes 可能会被弃用。
starrocks.fe.jdbc.urlNone (无)用于连接到 FE 的 MySQL 服务器的地址。 格式:jdbc:mysql://<fe_host>:<fe_query_port>
注意
在 Spark Connector 1.1.0 及更高版本中,此参数是必需的。
userNone (无)您的 StarRocks 集群帐户的用户名。 该用户需要 StarRocks 表上的SELECT 权限
starrocks.userNone (无)您的 StarRocks 集群帐户的用户名。 从 Spark Connector 1.1.0 起支持此参数。 此参数等效于 user。 您只需要配置其中一个。 在 Spark Connector 1.1.0 及更高版本中,我们建议您使用 starrocks.user,因为 user 可能会被弃用。
passwordNone (无)您的 StarRocks 集群帐户的密码。
starrocks.passwordNone (无)您的 StarRocks 集群帐户的密码。 从 Spark Connector 1.1.0 起支持此参数。 此参数等效于 password。 您只需要配置其中一个。 在 Spark Connector 1.1.0 及更高版本中,我们建议您使用 starrocks.password,因为 password 可能会被弃用。
starrocks.filter.query.in.max.count100谓词下推期间 IN 表达式支持的最大值数。 如果 IN 表达式中指定的值数超过此限制,则 IN 表达式中指定的过滤条件将在 Spark 上处理。

Spark RDD 的参数

以下参数仅适用于 Spark RDD 读取方法。

参数默认值描述
starrocks.request.auth.userNone (无)您的 StarRocks 集群帐户的用户名。
starrocks.request.auth.passwordNone (无)您的 StarRocks 集群帐户的密码。
starrocks.read.fieldNone (无)您要从中读取数据的 StarRocks 表列。 您可以指定多个列,这些列必须用逗号 (,) 分隔。

StarRocks 和 Spark 之间的数据类型映射

Spark Connector 1.1.0 及更高版本

StarRocks 数据类型Spark 数据类型
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
LARGEINTDataTypes.StringType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DECIMALDecimalType
CHARDataTypes.StringType
VARCHARDataTypes.StringType
STRINGDataTypes.StringType
DATEDataTypes.DateType
DATETIMEDataTypes.TimestampType
JSONDataTypes.StringType
注意
此数据类型映射自 Spark Connector v1.1.2 起支持,并且需要至少 2.5.13、3.0.3、3.1.0 或更高版本的 StarRocks。
ARRAY不支持的数据类型
HLL不支持的数据类型
BITMAP不支持的数据类型

Spark Connector 1.0.0

StarRocks 数据类型Spark 数据类型
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
LARGEINTDataTypes.StringType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DECIMALDecimalType
CHARDataTypes.StringType
VARCHARDataTypes.StringType
DATEDataTypes.StringType
DATETIMEDataTypes.StringType
ARRAY不支持的数据类型
HLL不支持的数据类型
BITMAP不支持的数据类型

当直接使用 DATE 和 DATETIME 数据类型时,StarRocks 使用的底层存储引擎的处理逻辑无法覆盖预期的时间范围。 因此,Spark Connector 将 StarRocks 中的 DATE 和 DATETIME 数据类型映射到 Spark 中的 STRING 数据类型,并生成与从 StarRocks 读取的日期和时间数据匹配的可读字符串文本。

升级 Spark Connector

从 1.0.0 版本升级到 1.1.0 版本

  • 自 1.1.1 起,由于 mysql-connector-java 使用的 GPL 许可的限制,Spark Connector 不提供 mysql-connector-java,它是 MySQL 的官方 JDBC 驱动程序。 但是,Spark Connector 仍然需要 mysql-connector-java 来连接到 StarRocks 以获取表元数据,因此您需要手动将驱动程序添加到 Spark 类路径。 您可以在 MySQL 站点Maven Central 上找到该驱动程序。

  • 在 1.1.0 版本中,Spark Connector 使用 JDBC 访问 StarRocks 以获取更详细的表信息。 因此,您必须配置 starrocks.fe.jdbc.url

  • 在 1.1.0 版本中,某些参数已重命名。 旧参数和新参数现在都保留。 对于每对等效参数,您只需要配置其中一个,但我们建议您使用新参数,因为旧参数可能会被弃用。

    • starrocks.fenodes 重命名为 starrocks.fe.http.url
    • user 重命名为 starrocks.user
    • password 重命名为 starrocks.password
  • 在 1.1.0 版本中,某些数据类型的映射根据 Spark 3.x 进行了调整

    • StarRocks 中的 DATE 映射到 Spark 中的 DataTypes.DateType(最初为 DataTypes.StringType)。
    • StarRocks 中的 DATETIME 映射到 Spark 中的 DataTypes.TimestampType(最初为 DataTypes.StringType)。

示例

以下示例假设您已在 StarRocks 集群中创建了一个名为 test 的数据库,并且您具有用户 root 的权限。 示例中的参数设置基于 Spark Connector 1.1.0。

网络配置

确保 Spark 所在的机器可以通过 http_port (默认: 8030) 和 query_port (默认: 9030) 访问 StarRocks 集群的 FE 节点,并通过 be_port (默认: 9060) 访问 BE 节点。

数据示例

执行以下操作以准备示例表

  1. 转到 test 数据库并创建一个名为 score_board 的表。

    MySQL [test]> CREATE TABLE `score_board`
    (
    `id` int(11) NOT NULL COMMENT "",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`)
    PROPERTIES (
    "replication_num" = "3"
    );
  2. 将数据插入到 score_board 表中。

    MySQL [test]> INSERT INTO score_board
    VALUES
    (1, 'Bob', 21),
    (2, 'Stan', 21),
    (3, 'Sam', 22),
    (4, 'Tony', 22),
    (5, 'Alice', 22),
    (6, 'Lucy', 23),
    (7, 'Polly', 23),
    (8, 'Tom', 23),
    (9, 'Rose', 24),
    (10, 'Jerry', 24),
    (11, 'Jason', 24),
    (12, 'Lily', 25),
    (13, 'Stephen', 25),
    (14, 'David', 25),
    (15, 'Eddie', 26),
    (16, 'Kate', 27),
    (17, 'Cathy', 27),
    (18, 'Judy', 27),
    (19, 'Julia', 28),
    (20, 'Robert', 28),
    (21, 'Jack', 29);
  3. 查询 score_board 表。

    MySQL [test]> SELECT * FROM score_board;
    +------+---------+-------+
    | id | name | score |
    +------+---------+-------+
    | 1 | Bob | 21 |
    | 2 | Stan | 21 |
    | 3 | Sam | 22 |
    | 4 | Tony | 22 |
    | 5 | Alice | 22 |
    | 6 | Lucy | 23 |
    | 7 | Polly | 23 |
    | 8 | Tom | 23 |
    | 9 | Rose | 24 |
    | 10 | Jerry | 24 |
    | 11 | Jason | 24 |
    | 12 | Lily | 25 |
    | 13 | Stephen | 25 |
    | 14 | David | 25 |
    | 15 | Eddie | 26 |
    | 16 | Kate | 27 |
    | 17 | Cathy | 27 |
    | 18 | Judy | 27 |
    | 19 | Julia | 28 |
    | 20 | Robert | 28 |
    | 21 | Jack | 29 |
    +------+---------+-------+
    21 rows in set (0.01 sec)

使用 Spark SQL 读取数据

  1. 在 Spark 目录中运行以下命令以启动 Spark SQL

    sh spark-sql
  2. 运行以下命令以在属于 test 数据库的 score_board 表上创建一个名为 spark_starrocks 的临时视图

    spark-sql> CREATE TEMPORARY VIEW spark_starrocks
    USING starrocks
    OPTIONS
    (
    "starrocks.table.identifier" = "test.score_board",
    "starrocks.fe.http.url" = "<fe_host>:<fe_http_port>",
    "starrocks.fe.jdbc.url" = "jdbc:mysql://<fe_host>:<fe_query_port>",
    "starrocks.user" = "root",
    "starrocks.password" = ""
    );
  3. 运行以下命令以从临时视图读取数据

    spark-sql> SELECT * FROM spark_starrocks;

    Spark 返回以下数据

    1        Bob        21
    2 Stan 21
    3 Sam 22
    4 Tony 22
    5 Alice 22
    6 Lucy 23
    7 Polly 23
    8 Tom 23
    9 Rose 24
    10 Jerry 24
    11 Jason 24
    12 Lily 25
    13 Stephen 25
    14 David 25
    15 Eddie 26
    16 Kate 27
    17 Cathy 27
    18 Judy 27
    19 Julia 28
    20 Robert 28
    21 Jack 29
    Time taken: 1.883 seconds, Fetched 21 row(s)
    22/08/09 15:29:36 INFO thriftserver.SparkSQLCLIDriver: Time taken: 1.883 seconds, Fetched 21 row(s)

使用 Spark DataFrame 读取数据

  1. 在 Spark 目录中运行以下命令以启动 Spark Shell

    sh spark-shell
  2. 运行以下命令以在属于 test 数据库的 score_board 表上创建一个名为 starrocksSparkDF 的 DataFrame

    scala> val starrocksSparkDF = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.score_board")
    .option("starrocks.fe.http.url", s"<fe_host>:<fe_http_port>")
    .option("starrocks.fe.jdbc.url", s"jdbc:mysql://<fe_host>:<fe_query_port>")
    .option("starrocks.user", s"root")
    .option("starrocks.password", s"")
    .load()
  3. 从 DataFrame 读取数据。 例如,如果要读取前 10 行,请运行以下命令

    scala> starrocksSparkDF.show(10)

    Spark 返回以下数据

    +---+-----+-----+
    | id| name|score|
    +---+-----+-----+
    | 1| Bob| 21|
    | 2| Stan| 21|
    | 3| Sam| 22|
    | 4| Tony| 22|
    | 5|Alice| 22|
    | 6| Lucy| 23|
    | 7|Polly| 23|
    | 8| Tom| 23|
    | 9| Rose| 24|
    | 10|Jerry| 24|
    +---+-----+-----+
    only showing top 10 rows

    注意

    默认情况下,如果您不指定要读取的行数,Spark 将返回前 20 行。

使用 Spark RDD 读取数据

  1. 在 Spark 目录中运行以下命令以启动 Spark Shell

    sh spark-shell
  2. 运行以下命令以在属于 test 数据库的 score_board 表上创建一个名为 starrocksSparkRDD 的 RDD。

    scala> import com.starrocks.connector.spark._
    scala> val starrocksSparkRDD = sc.starrocksRDD
    (
    tableIdentifier = Some("test.score_board"),
    cfg = Some(Map(
    "starrocks.fenodes" -> "<fe_host>:<fe_http_port>",
    "starrocks.request.auth.user" -> "root",
    "starrocks.request.auth.password" -> ""
    ))
    )
  3. 从 RDD 读取数据。 例如,如果要读取前 10 个元素,请运行以下命令

    scala> starrocksSparkRDD.take(10)

    Spark 返回以下数据

    res0: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24])

    要读取整个 RDD,请运行以下命令

    scala> starrocksSparkRDD.collect()

    Spark 返回以下数据

    res1: Array[AnyRef] = Array([1, Bob, 21], [2, Stan, 21], [3, Sam, 22], [4, Tony, 22], [5, Alice, 22], [6, Lucy, 23], [7, Polly, 23], [8, Tom, 23], [9, Rose, 24], [10, Jerry, 24], [11, Jason, 24], [12, Lily, 25], [13, Stephen, 25], [14, David, 25], [15, Eddie, 26], [16, Kate, 27], [17, Cathy, 27], [18, Judy, 27], [19, Julia, 28], [20, Robert, 28], [21, Jack, 29])

最佳实践

当您使用 Spark Connector 从 StarRocks 读取数据时,可以使用 starrocks.filter.query 参数指定过滤条件,Spark 根据该过滤条件修剪分区、存储桶和前缀索引,以降低数据拉取的成本。 本节以 Spark DataFrame 为例,说明如何实现此目的。

环境设置

组件版本
SparkSpark 2.4.4 和 Scala 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_302)
StarRocks2.2.0
Spark Connectorstarrocks-spark2_2.11-1.0.0.jar

数据示例

执行以下操作以准备示例表

  1. 转到 test 数据库并创建一个名为 mytable 的表。

    MySQL [test]> CREATE TABLE `mytable`
    (
    `k` int(11) NULL COMMENT "bucket",
    `b` int(11) NULL COMMENT "",
    `dt` datetime NULL COMMENT "",
    `v` int(11) NULL COMMENT ""
    )
    ENGINE=OLAP
    DUPLICATE KEY(`k`,`b`, `dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
    PARTITION p202201 VALUES [('2022-01-01 00:00:00'), ('2022-02-01 00:00:00')),
    PARTITION p202202 VALUES [('2022-02-01 00:00:00'), ('2022-03-01 00:00:00')),
    PARTITION p202203 VALUES [('2022-03-01 00:00:00'), ('2022-04-01 00:00:00'))
    )
    DISTRIBUTED BY HASH(`k`)
    PROPERTIES (
    "replication_num" = "3"
    );
  2. 将数据插入到 mytable

    MySQL [test]> INSERT INTO mytable
    VALUES
    (1, 11, '2022-01-02 08:00:00', 111),
    (2, 22, '2022-02-02 08:00:00', 222),
    (3, 33, '2022-03-02 08:00:00', 333);
  3. 查询 mytable 表。

    MySQL [test]> select * from mytable;
    +------+------+---------------------+------+
    | k | b | dt | v |
    +------+------+---------------------+------+
    | 1 | 11 | 2022-01-02 08:00:00 | 111 |
    | 2 | 22 | 2022-02-02 08:00:00 | 222 |
    | 3 | 33 | 2022-03-02 08:00:00 | 333 |
    +------+------+---------------------+------+
    3 rows in set (0.01 sec)

全表扫描

  1. 在 Spark 目录中运行以下命令,以在属于 test 数据库的 mytable 表上创建一个名为 df 的 DataFrame

    scala>  val df = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.mytable")
    .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .load()
  2. 查看 StarRocks 集群的 FE 日志文件 fe.log,并找到执行的用于读取数据的 SQL 语句。 示例

    2022-08-09 18:57:38,091 INFO (nioEventLoopGroup-3-10|196) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable`] from external service [ user ['root'@'%']] for database [test] table [mytable]
  3. test 数据库中,使用 EXPLAIN 获取 SELECT k,b,dt,v from test.mytable 语句的执行计划

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable`;
    +-----------------------------------------------------------------------+
    | Explain String |
    +-----------------------------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 1:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 01 |
    | UNPARTITIONED |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | partitions=3/3 |
    | rollup: mytable |
    | tabletRatio=9/9 |
    | tabletList=41297,41299,41301,41303,41305,41307,41309,41311,41313 |
    | cardinality=3 |
    | avgRowSize=4.0 |
    | numNodes=0 |
    +-----------------------------------------------------------------------+
    26 rows in set (0.00 sec)

在此示例中,未执行任何修剪。 因此,Spark 扫描保存数据的三个分区中的所有分区(如 partitions=3/3 所示),并扫描这三个分区中的所有 9 个 Tablet(如 tabletRatio=9/9 所示)。

分区修剪

  1. 运行以下命令,其中您使用 starrocks.filter.query 参数指定过滤条件 dt='2022-01-02 08:00:00 用于分区修剪,在 Spark 目录中以在属于 test 数据库的 mytable 表上创建一个名为 df 的 DataFrame

    scala> val df = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.mytable")
    .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .option("starrocks.filter.query", "dt='2022-01-02 08:00:00'")
    .load()
  2. 查看 StarRocks 集群的 FE 日志文件 fe.log,并找到执行的用于读取数据的 SQL 语句。 示例

    2022-08-09 19:02:31,253 INFO (nioEventLoopGroup-3-14|204) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] table [mytable]
  3. test 数据库中,使用 EXPLAIN 获取 SELECT k,b,dt,v from test.mytable where dt='2022-01-02 08:00:00' 语句的执行计划

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where dt='2022-01-02 08:00:00';
    +------------------------------------------------+
    | Explain String |
    +------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 1:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 01 |
    | UNPARTITIONED |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | PREDICATES: 3: dt = '2022-01-02 08:00:00' |
    | partitions=1/3 |
    | rollup: mytable |
    | tabletRatio=3/3 |
    | tabletList=41297,41299,41301 |
    | cardinality=1 |
    | avgRowSize=20.0 |
    | numNodes=0 |
    +------------------------------------------------+
    27 rows in set (0.01 sec)

在此示例中,仅执行分区修剪,而未执行存储桶修剪。 因此,Spark 扫描三个分区中的一个分区(如 partitions=1/3 所示)以及该分区中的所有 Tablet(如 tabletRatio=3/3 所示)。

存储桶修剪

  1. 运行以下命令,其中您使用 starrocks.filter.query 参数指定过滤条件 k=1 用于存储桶修剪,在 Spark 目录中以在属于 test 数据库的 mytable 表上创建一个名为 df 的 DataFrame

    scala> val df = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.mytable")
    .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .option("starrocks.filter.query", "k=1")
    .load()
  2. 查看 StarRocks 集群的 FE 日志文件 fe.log,并找到执行的用于读取数据的 SQL 语句。 示例

    2022-08-09 19:04:44,479 INFO (nioEventLoopGroup-3-16|208) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1] from external service [ user ['root'@'%']] for database [test] table [mytable]
  3. test 数据库中,使用 EXPLAIN 获取 SELECT k,b,dt,v from test.mytable where k=1 语句的执行计划

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=1;
    +------------------------------------------+
    | Explain String |
    +------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: UNPARTITIONED |
    | |
    | RESULT SINK |
    | |
    | 1:EXCHANGE |
    | |
    | PLAN FRAGMENT 1 |
    | OUTPUT EXPRS: |
    | PARTITION: RANDOM |
    | |
    | STREAM DATA SINK |
    | EXCHANGE ID: 01 |
    | UNPARTITIONED |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | PREDICATES: 1: k = 1 |
    | partitions=3/3 |
    | rollup: mytable |
    | tabletRatio=3/9 |
    | tabletList=41299,41305,41311 |
    | cardinality=1 |
    | avgRowSize=20.0 |
    | numNodes=0 |
    +------------------------------------------+
    27 rows in set (0.01 sec)

在此示例中,仅执行存储桶修剪,而未执行分区修剪。 因此,Spark 扫描保存数据的三个分区中的所有分区(如 partitions=3/3 所示),并扫描这三个分区中的所有三个 Tablet(如 tabletRatio=3/9 所示),以检索满足 k = 1 过滤条件的哈希值。

分区修剪和存储桶修剪

  1. 运行以下命令,其中您使用 starrocks.filter.query 参数指定两个过滤条件 k=7dt='2022-01-02 08:00:00' 用于存储桶修剪和分区修剪,在 Spark 目录中以在 test 数据库的 mytable 表上创建一个名为 df 的 DataFrame

    scala> val df = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.mytable")
    .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"")
    .option("password", s"")
    .option("starrocks.filter.query", "k=7 and dt='2022-01-02 08:00:00'")
    .load()
  2. 查看 StarRocks 集群的 FE 日志文件 fe.log,并找到执行的用于读取数据的 SQL 语句。 示例

    2022-08-09 19:06:34,939 INFO (nioEventLoopGroup-3-18|212) [TableQueryPlanAction.executeWithoutPassword():126] receive SQL statement [select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00'] from external service [ user ['root'@'%']] for database [test] t
    able [mytable]
  3. test 数据库中,使用 EXPLAIN 获取 SELECT k,b,dt,v from test.mytable where k=7 and dt='2022-01-02 08:00:00' 语句的执行计划

    MySQL [test]> EXPLAIN select `k`,`b`,`dt`,`v` from `test`.`mytable` where k=7 and dt='2022-01-02 08:00:00';
    +----------------------------------------------------------+
    | Explain String |
    +----------------------------------------------------------+
    | PLAN FRAGMENT 0 |
    | OUTPUT EXPRS:1: k | 2: b | 3: dt | 4: v |
    | PARTITION: RANDOM |
    | |
    | RESULT SINK |
    | |
    | 0:OlapScanNode |
    | TABLE: mytable |
    | PREAGGREGATION: ON |
    | PREDICATES: 1: k = 7, 3: dt = '2022-01-02 08:00:00' |
    | partitions=1/3 |
    | rollup: mytable |
    | tabletRatio=1/3 |
    | tabletList=41301 |
    | cardinality=1 |
    | avgRowSize=20.0 |
    | numNodes=0 |
    +----------------------------------------------------------+
    17 rows in set (0.00 sec)

在此示例中,执行分区修剪和存储桶修剪。 因此,Spark 仅扫描三个分区中的一个分区(如 partitions=1/3 所示)以及该分区中的一个 Tablet(如 tabletRatio=1/3 所示)。

前缀索引过滤

  1. 将更多数据记录插入到属于 test 数据库的 mytable 表的分区中

    MySQL [test]> INSERT INTO mytable
    VALUES
    (1, 11, "2022-01-02 08:00:00", 111),
    (3, 33, "2022-01-02 08:00:00", 333),
    (3, 33, "2022-01-02 08:00:00", 333),
    (3, 33, "2022-01-02 08:00:00", 333);
  2. 查询 mytable

    MySQL [test]> SELECT * FROM mytable;
    +------+------+---------------------+------+
    | k | b | dt | v |
    +------+------+---------------------+------+
    | 1 | 11 | 2022-01-02 08:00:00 | 111 |
    | 3 | 33 | 2022-01-02 08:00:00 | 333 |
    | 3 | 33 | 2022-01-02 08:00:00 | 333 |
    | 3 | 33 | 2022-01-02 08:00:00 | 333 |
    +------+------+---------------------+------+
    4 rows in set (0.01 sec)
  3. 运行以下命令,其中您使用 starrocks.filter.query 参数指定过滤条件 k=1 用于前缀索引过滤,在 Spark 目录中以在属于 test 数据库的 mytable 表上创建一个名为 df 的 DataFrame

    scala> val df = spark.read.format("starrocks")
    .option("starrocks.table.identifier", s"test.mytable")
    .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
    .option("user", s"root")
    .option("password", s"")
    .option("starrocks.filter.query", "k=1")
    .load()
  4. test 数据库中,设置 is_report_successtrue 以启用配置文件报告

    MySQL [test]> SET is_report_success = true;
    Query OK, 0 rows affected (0.00 sec)
  5. 使用浏览器打开 http://<fe_host>:<http_http_port>/query 页面,并查看 SELECT * FROM mytable where k=1 语句的配置文件。 示例

    OLAP_SCAN (plan_node_id=0):
    CommonMetrics:
    - CloseTime: 1.255ms
    - OperatorTotalTime: 1.404ms
    - PeakMemoryUsage: 0.00
    - PullChunkNum: 8
    - PullRowNum: 2
    - __MAX_OF_PullRowNum: 2
    - __MIN_OF_PullRowNum: 0
    - PullTotalTime: 148.60us
    - PushChunkNum: 0
    - PushRowNum: 0
    - PushTotalTime: 0ns
    - SetFinishedTime: 136ns
    - SetFinishingTime: 129ns
    UniqueMetrics:
    - Predicates: 1: k = 1
    - Rollup: mytable
    - Table: mytable
    - BytesRead: 88.00 B
    - __MAX_OF_BytesRead: 88.00 B
    - __MIN_OF_BytesRead: 0.00
    - CachedPagesNum: 0
    - CompressedBytesRead: 844.00 B
    - __MAX_OF_CompressedBytesRead: 844.00 B
    - __MIN_OF_CompressedBytesRead: 0.00
    - CreateSegmentIter: 18.582us
    - IOTime: 4.425us
    - LateMaterialize: 17.385us
    - PushdownPredicates: 3
    - RawRowsRead: 2
    - __MAX_OF_RawRowsRead: 2
    - __MIN_OF_RawRowsRead: 0
    - ReadPagesNum: 12
    - __MAX_OF_ReadPagesNum: 12
    - __MIN_OF_ReadPagesNum: 0
    - RowsRead: 2
    - __MAX_OF_RowsRead: 2
    - __MIN_OF_RowsRead: 0
    - ScanTime: 154.367us
    - SegmentInit: 95.903us
    - BitmapIndexFilter: 0ns
    - BitmapIndexFilterRows: 0
    - BloomFilterFilterRows: 0
    - ShortKeyFilterRows: 3
    - __MAX_OF_ShortKeyFilterRows: 3
    - __MIN_OF_ShortKeyFilterRows: 0
    - ZoneMapIndexFilterRows: 0
    - SegmentRead: 2.559us
    - BlockFetch: 2.187us
    - BlockFetchCount: 2
    - __MAX_OF_BlockFetchCount: 2
    - __MIN_OF_BlockFetchCount: 0
    - BlockSeek: 7.789us
    - BlockSeekCount: 2
    - __MAX_OF_BlockSeekCount: 2
    - __MIN_OF_BlockSeekCount: 0
    - ChunkCopy: 25ns
    - DecompressT: 0ns
    - DelVecFilterRows: 0
    - IndexLoad: 0ns
    - PredFilter: 353ns
    - PredFilterRows: 0
    - RowsetsReadCount: 7
    - SegmentsReadCount: 3
    - __MAX_OF_SegmentsReadCount: 2
    - __MIN_OF_SegmentsReadCount: 0
    - TotalColumnsDataPageCount: 8
    - __MAX_OF_TotalColumnsDataPageCount: 8
    - __MIN_OF_TotalColumnsDataPageCount: 0
    - UncompressedBytesRead: 508.00 B
    - __MAX_OF_UncompressedBytesRead: 508.00 B
    - __MIN_OF_UncompressedBytesRead: 0.00

在此示例中,过滤条件 k = 1 可以命中前缀索引。 因此,Spark 可以过滤掉三行(如 ShortKeyFilterRows: 3 所示)。