使用 Spark Connector 加载数据(推荐)
StarRocks 提供了一个自主开发的连接器,名为 StarRocks Connector for Apache Spark™ (简称 Spark Connector),用于帮助您通过 Spark 加载数据到 StarRocks 表中。其基本原理是先累积数据,然后通过 STREAM LOAD 一次性加载到 StarRocks 中。Spark Connector 基于 Spark DataSource V2 实现。可以通过 Spark DataFrames 或 Spark SQL 创建 DataSource。同时支持批量和结构化流模式。
注意
只有对 StarRocks 表具有 SELECT 和 INSERT 权限的用户才能将数据加载到该表中。您可以按照 GRANT 中提供的说明授予用户这些权限。
版本要求
Spark Connector | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 及更高版本 | 8 | 2.12 |
1.1.1 | 3.2、3.3 或 3.4 | 2.5 及更高版本 | 8 | 2.12 |
1.1.0 | 3.2、3.3 或 3.4 | 2.5 及更高版本 | 8 | 2.12 |
注意
- 有关不同版本的 Spark Connector 之间的行为更改,请参阅 升级 Spark Connector。
- 从 1.1.1 版本开始,Spark Connector 不再提供 MySQL JDBC 驱动程序,您需要手动将驱动程序导入到 Spark 类路径中。您可以在 MySQL 网站 或 Maven Central 上找到该驱动程序。
获取 Spark Connector
您可以通过以下方式获取 Spark Connector JAR 文件
- 直接下载已编译的 Spark Connector JAR 文件。
- 将 Spark Connector 作为依赖项添加到您的 Maven 项目中,然后下载 JAR 文件。
- 自己将 Spark Connector 的源代码编译为 JAR 文件。
Spark Connector JAR 文件的命名格式为 starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
。
例如,如果您在您的环境中安装了 Spark 3.2 和 Scala 2.12,并且您想使用 Spark Connector 1.1.0,您可以使用 starrocks-spark-connector-3.2_2.12-1.1.0.jar
。
注意
一般来说,最新版本的 Spark Connector 仅保持与 Spark 的三个最新版本的兼容性。
下载已编译的 Jar 文件
直接从 Maven Central Repository 下载相应版本的 Spark Connector JAR。
Maven 依赖
-
在您的 Maven 项目的
pom.xml
文件中,按照以下格式添加 Spark Connector 作为依赖项。将spark_version
、scala_version
和connector_version
替换为相应的版本。<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency> -
例如,如果您的环境中的 Spark 版本是 3.2,Scala 版本是 2.12,并且您选择 Spark Connector 1.1.0,您需要添加以下依赖项
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
自行编译
-
执行以下命令将 Spark Connector 的源代码编译为 JAR 文件。请注意,
spark_version
将被替换为相应的 Spark 版本。sh build.sh <spark_version>
例如,如果您的环境中的 Spark 版本是 3.2,您需要执行以下命令
sh build.sh 3.2
-
转到
target/
目录以查找编译后生成的 Spark Connector JAR 文件,例如starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar
。
注意
未正式发布的 Spark Connector 的名称包含
SNAPSHOT
后缀。
参数
starrocks.fe.http.url
必需:是
默认值:无
描述:您的 StarRocks 集群中 FE 的 HTTP URL。您可以指定多个 URL,必须用逗号 (,) 分隔。格式:<fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>
。从 1.1.1 版本开始,您还可以向 URL 添加 http://
前缀,例如 http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2>
。
starrocks.fe.jdbc.url
必需:是
默认值:无
描述:用于连接 FE 的 MySQL 服务器的地址。格式:jdbc:mysql://<fe_host>:<fe_query_port>
。
starrocks.table.identifier
必需:是
默认值:无
描述:StarRocks 表的名称。格式:<database_name>.<table_name>
。
starrocks.user
必需:是
默认值:无
描述:您的 StarRocks 集群帐户的用户名。用户需要在 StarRocks 表上具有 SELECT 和 INSERT 权限。
starrocks.password
必需:是
默认值:无
描述:您的 StarRocks 集群帐户的密码。
starrocks.write.label.prefix
必需:否
默认值:spark-
描述:Stream Load 使用的标签前缀。
starrocks.write.enable.transaction-stream-load
必需:否
默认值:TRUE
描述:是否使用 Stream Load 事务接口 加载数据。需要 StarRocks v2.5 或更高版本。此功能可以在事务中加载更多数据,减少内存使用,并提高性能。
注意:从 1.1.1 版本开始,此参数仅当 starrocks.write.max.retries
的值是非正数时才生效,因为 Stream Load 事务接口不支持重试。
starrocks.write.buffer.size
必需:否
默认值: 104857600
描述:在一次发送到 StarRocks 之前可以在内存中累积的最大数据量。将此参数设置为更大的值可以提高加载性能,但可能会增加加载延迟。
starrocks.write.buffer.rows
必需:否
默认值:Integer.MAX_VALUE
描述:自 1.1.1 版本起支持。在一次发送到 StarRocks 之前可以在内存中累积的最大行数。
starrocks.write.flush.interval.ms
必需:否
默认值: 300000
描述:将数据发送到 StarRocks 的间隔。此参数用于控制加载延迟。
starrocks.write.max.retries
必需:否
默认值: 3
描述:自 1.1.1 版本起支持。如果加载失败,连接器重试对同一批数据执行 Stream Load 的次数。
注意:由于 Stream Load 事务接口不支持重试。如果此参数为正数,则连接器始终使用 Stream Load 接口,并忽略 starrocks.write.enable.transaction-stream-load
的值。
starrocks.write.retry.interval.ms
必需:否
默认值: 10000
描述:自 1.1.1 版本起支持。如果加载失败,重试对同一批数据执行 Stream Load 的间隔。
starrocks.columns
必需:否
默认值:无
描述:您要将数据加载到的 StarRocks 表列。您可以指定多个列,必须用逗号 (,) 分隔,例如 "col0,col1,col2"
。
starrocks.column.types
必需:否
默认值:无
描述:自 1.1.1 版本起支持。自定义 Spark 的列数据类型,而不是使用从 StarRocks 表推断出的默认值和 默认映射。参数值是一个 DDL 格式的模式,与 Spark StructType#toDDL 的输出相同,例如 col0 INT, col1 STRING, col2 BIGINT
。请注意,您只需要指定需要自定义的列。一个用例是将数据加载到 BITMAP 或 HLL 类型的列中。
starrocks.write.properties.*
必需:否
默认值:无
描述:用于控制 Stream Load 行为的参数。例如,参数 starrocks.write.properties.format
指定要加载的数据的格式,例如 CSV 或 JSON。有关支持的参数及其描述的列表,请参阅 STREAM LOAD。
starrocks.write.properties.format
必需:否
默认值:CSV
描述:Spark Connector 在将每批数据发送到 StarRocks 之前转换数据的基于的文件格式。有效值:CSV 和 JSON。
starrocks.write.properties.row_delimiter
必需:否
默认值:\n
描述:CSV 格式数据的行分隔符。
starrocks.write.properties.column_separator
必需:否
默认值:\t
描述:CSV 格式数据的列分隔符。
starrocks.write.properties.partial_update
必需:否
默认值:FALSE
描述:是否使用部分更新。有效值:TRUE
和 FALSE
。默认值:FALSE
,表示禁用此功能。
starrocks.write.properties.partial_update_mode
必需:否
默认值:row
描述:指定部分更新的模式。有效值:row
和 column
。
- 值
row
(默认)表示行模式下的部分更新,更适合具有许多列和小批量的实时更新。 - 值
column
表示列模式下的部分更新,更适合于具有少量列和多行的批量更新。在这种情况下,启用列模式可以提供更快的更新速度。例如,在一个有 100 列的表中,如果只更新所有行的 10 列(总数的 10%),则列模式的更新速度是 10 倍。
starrocks.write.num.partitions
必需:否
默认值:无
描述:Spark 可以并行写入数据的分区数。当数据量较小时,您可以减少分区数以降低加载并发和频率。此参数的默认值由 Spark 确定。但是,此方法可能会导致 Spark Shuffle 成本。
starrocks.write.partition.columns
必需:否
默认值:无
描述:Spark 中的分区列。该参数仅当指定了 starrocks.write.num.partitions
时才生效。如果未指定此参数,则将所有正在写入的列用于分区。
starrocks.timezone
必需:否
默认值:JVM 的默认时区
描述:自 1.1.1 起支持。用于将 Spark TimestampType
转换为 StarRocks DATETIME
的时区。默认值是由 ZoneId#systemDefault()
返回的 JVM 时区。格式可以是时区名称(如 Asia/Shanghai
),也可以是区域偏移量(如 +08:00
)。
Spark 和 StarRocks 之间的数据类型映射
-
默认数据类型映射如下
Spark 数据类型 StarRocks 数据类型 BooleanType BOOLEAN ByteType TINYINT ShortType SMALLINT IntegerType INT LongType BIGINT StringType LARGEINT FloatType FLOAT DoubleType DOUBLE DecimalType DECIMAL StringType CHAR StringType VARCHAR StringType STRING StringType JSON DateType DATE TimestampType DATETIME ArrayType ARRAY
注意
自 1.1.1 版本起支持。有关详细步骤,请参阅 将数据加载到 ARRAY 类型的列中。 -
您还可以自定义数据类型映射。
例如,StarRocks 表包含 BITMAP 和 HLL 列,但 Spark 不支持这两种数据类型。您需要在 Spark 中自定义相应的数据类型。有关详细步骤,请参阅将数据加载到 BITMAP 和 HLL 列中。自 1.1.1 版本起支持 BITMAP 和 HLL。
升级 Spark Connector
从 1.1.0 版本升级到 1.1.1 版本
- 由于
mysql-connector-java
使用 GPL 许可的限制,从 1.1.1 版本开始,Spark Connector 不再提供mysql-connector-java
,它是 MySQL 的官方 JDBC 驱动程序。但是,Spark Connector 仍然需要 MySQL JDBC 驱动程序来连接 StarRocks 以获取表元数据,因此您需要手动将驱动程序添加到 Spark 类路径中。您可以在 MySQL 网站 或 Maven Central 上找到该驱动程序。 - 从 1.1.1 版本开始,连接器默认使用 Stream Load 接口,而不是 1.1.0 版本中的 Stream Load 事务接口。如果您仍然想使用 Stream Load 事务接口,您可以将选项
starrocks.write.max.retries
设置为0
。有关详细信息,请参阅starrocks.write.enable.transaction-stream-load
和starrocks.write.max.retries
的描述。
示例
以下示例说明了如何使用 Spark Connector 通过 Spark DataFrames 或 Spark SQL 将数据加载到 StarRocks 表中。Spark DataFrames 支持批量和结构化流模式。
有关更多示例,请参阅 Spark Connector 示例。
准备工作
创建 StarRocks 表
创建一个数据库 test
并创建一个 Primary Key 表 score_board
。
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
网络配置
确保 Spark 所在的机器可以通过 http_port
(默认:8030
)和 query_port
(默认:9030
)访问 StarRocks 集群的 FE 节点,并通过 be_http_port
(默认:8040
)访问 BE 节点。
设置您的 Spark 环境
请注意,以下示例在 Spark 3.2.4 中运行,并使用 spark-shell
、pyspark
和 spark-sql
。在运行示例之前,请确保将 Spark Connector JAR 文件放在 $SPARK_HOME/jars
目录中。
使用 Spark DataFrames 加载数据
以下两个示例说明如何使用 Spark DataFrames 批量或结构化流模式加载数据。
批量
在内存中构造数据并将数据加载到 StarRocks 表中。
- 您可以使用 Scala 或 Python 编写 Spark 应用程序。
对于 Scala,在 spark-shell
中运行以下代码片段
// 1. Create a DataFrame from a sequence.
val data = Seq((1, "starrocks", 100), (2, "spark", 100))
val df = data.toDF("id", "name", "score")
// 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
// You need to modify the options according your own environment.
df.write.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
.mode("append")
.save()
对于 Python,在 pyspark
中运行以下代码片段
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("StarRocks Example") \
.getOrCreate()
# 1. Create a DataFrame from a sequence.
data = [(1, "starrocks", 100), (2, "spark", 100)]
df = spark.sparkContext.parallelize(data) \
.toDF(["id", "name", "score"])
# 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
# You need to modify the options according your own environment.
df.write.format("starrocks") \
.option("starrocks.fe.http.url", "127.0.0.1:8030") \
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030") \
.option("starrocks.table.identifier", "test.score_board") \
.option("starrocks.user", "root") \
.option("starrocks.password", "") \
.mode("append") \
.save()
-
查询 StarRocks 表中的数据。
MySQL [test]> SELECT * FROM `score_board`;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.00 sec)
结构化流
构造从 CSV 文件流式读取数据并将数据加载到 StarRocks 表中。
-
在目录
csv-data
中,创建一个包含以下数据的 CSV 文件test.csv
3,starrocks,100
4,spark,100 -
您可以使用 Scala 或 Python 编写 Spark 应用程序。
对于 Scala,在 spark-shell
中运行以下代码片段
import org.apache.spark.sql.types.StructType
// 1. Create a DataFrame from CSV.
val schema = (new StructType()
.add("id", "integer")
.add("name", "string")
.add("score", "integer")
)
val df = (spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
// Replace it with your path to the directory "csv-data".
.load("/path/to/csv-data")
)
// 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
// You need to modify the options according your own environment.
val query = (df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
// replace it with your checkpoint directory
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
对于 Python,在 pyspark
中运行以下代码片段
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
spark = SparkSession \
.builder \
.appName("StarRocks SS Example") \
.getOrCreate()
# 1. Create a DataFrame from CSV.
schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("score", IntegerType())
])
df = (
spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
# Replace it with your path to the directory "csv-data".
.load("/path/to/csv-data")
)
# 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
# You need to modify the options according your own environment.
query = (
df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
# replace it with your checkpoint directory
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
-
查询 StarRocks 表中的数据。
MySQL [test]> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 4 | spark | 100 |
| 3 | starrocks | 100 |
+------+-----------+-------+
2 rows in set (0.67 sec)
使用 Spark SQL 加载数据
以下示例说明了如何通过使用 Spark SQL CLI 中的 INSERT INTO
语句来使用 Spark SQL 加载数据。
-
在
spark-sql
中执行以下 SQL 语句-- 1. Create a table by configuring the data source as `starrocks` and the following options.
-- You need to modify the options according your own environment.
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"=""
);
-- 2. Insert two rows into the table.
INSERT INTO `score_board` VALUES (5, "starrocks", 100), (6, "spark", 100); -
查询 StarRocks 表中的数据。
MySQL [test]> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 6 | spark | 100 |
| 5 | starrocks | 100 |
+------+-----------+-------+
2 rows in set (0.00 sec)
最佳实践
将数据加载到 Primary Key 表
本节将展示如何将数据加载到 StarRocks Primary Key 表以实现部分更新和条件更新。您可以参阅 通过加载更改数据,了解这些功能的详细介绍。这些示例使用 Spark SQL。
准备工作
创建一个数据库 test
并在 StarRocks 中创建一个 Primary Key 表 score_board
。
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
部分更新
本示例将展示如何仅通过加载来更新列 name
中的数据
-
在 MySQL 客户端中将初始数据插入到 StarRocks 表中。
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100);
mysql> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.02 sec) -
在 Spark SQL 客户端中创建一个 Spark 表
score_board
。- 将选项
starrocks.write.properties.partial_update
设置为true
,该选项告诉连接器执行部分更新。 - 将选项
starrocks.columns
设置为"id,name"
,以告诉连接器要写入哪些列。
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.write.properties.partial_update"="true",
"starrocks.columns"="id,name"
); - 将选项
-
将数据插入到 Spark SQL 客户端中的表中,并且仅更新列
name
。INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'spark-update');
-
在 MySQL 客户端中查询 StarRocks 表。
您可以看到只有
name
的值发生了变化,而score
的值没有变化。mysql> select * from score_board;
+------+------------------+-------+
| id | name | score |
+------+------------------+-------+
| 1 | starrocks-update | 100 |
| 2 | spark-update | 100 |
+------+------------------+-------+
2 rows in set (0.02 sec)
条件更新
本示例将展示如何根据列 score
的值执行条件更新。仅当 score
的新值大于或等于旧值时,对 id
的更新才会生效。
-
在 MySQL 客户端中将初始数据插入到 StarRocks 表中。
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100);
mysql> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.02 sec) -
通过以下方式创建一个 Spark 表
score_board
。- 将选项
starrocks.write.properties.merge_condition
设置为score
,该选项告诉连接器使用列score
作为条件。 - 确保 Spark Connector 使用 Stream Load 接口加载数据,而不是 Stream Load 事务接口,因为后者不支持此功能。
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.write.properties.merge_condition"="score"
); - 将选项
-
将数据插入到 Spark SQL 客户端中的表中,并更新
id
为 1 的行,该行的 score 值较小,以及更新id
为 2 的行,该行的 score 值较大。INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'spark-update', 101);
-
在 MySQL 客户端中查询 StarRocks 表。
您可以看到只有
id
为 2 的行发生了变化,而id
为 1 的行没有变化。mysql> select * from score_board;
+------+--------------+-------+
| id | name | score |
+------+--------------+-------+
| 1 | starrocks | 100 |
| 2 | spark-update | 101 |
+------+--------------+-------+
2 rows in set (0.03 sec)
将数据加载到 BITMAP 类型的列中
BITMAP
通常用于加速 Count Distinct,例如计算 UV,请参阅 使用 Bitmap 进行精确 Count Distinct。在这里,我们以 UV 计数为例,展示如何将数据加载到 BITMAP
类型的列中。自 1.1.1 版本起支持 BITMAP
。
-
创建一个 StarRocks Aggregate 表。
在数据库
test
中,创建一个 Aggregate 表page_uv
,其中列visit_users
定义为BITMAP
类型,并配置了聚合函数BITMAP_UNION
。CREATE TABLE `test`.`page_uv` (
`page_id` INT NOT NULL COMMENT 'page ID',
`visit_date` datetime NOT NULL COMMENT 'access time',
`visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
) ENGINE=OLAP
AGGREGATE KEY(`page_id`, `visit_date`)
DISTRIBUTED BY HASH(`page_id`); -
创建一个 Spark 表。
Spark 表的模式是从 StarRocks 表推断出来的,并且 Spark 不支持
BITMAP
类型。因此,您需要在 Spark 中自定义相应的列数据类型,例如BIGINT
,通过配置选项"starrocks.column.types"="visit_users BIGINT"
。当使用 Stream Load 摄取数据时,连接器使用to_bitmap
函数将BIGINT
类型的数据转换为BITMAP
类型。在
spark-sql
中运行以下 DDLCREATE TABLE `page_uv`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.page_uv",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.column.types"="visit_users BIGINT"
); -
将数据加载到 StarRocks 表中。
在
spark-sql
中运行以下 DMLINSERT INTO `page_uv` VALUES
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
(1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
(2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23); -
从 StarRocks 表中计算页面 UV。
MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`;
+---------+-----------------------------+
| page_id | count(DISTINCT visit_users) |
+---------+-----------------------------+
| 2 | 1 |
| 1 | 3 |
+---------+-----------------------------+
2 rows in set (0.01 sec)
注意
连接器使用
to_bitmap
函数将 Spark 中TINYINT
、SMALLINT
、INTEGER
和BIGINT
类型的数据转换为 StarRocks 中的BITMAP
类型,并对其他 Spark 数据类型使用bitmap_hash
函数。
将数据加载到 HLL 类型的列中
HLL
可用于近似 Count Distinct,请参阅 使用 HLL 进行近似 Count Distinct。
在这里,我们以 UV 计数为例,展示如何将数据加载到 HLL
类型的列中。自 1.1.1 版本起支持 HLL
。
-
创建一个 StarRocks Aggregate 表。
在数据库
test
中,创建一个 Aggregate 表hll_uv
,其中列visit_users
定义为HLL
类型,并配置了聚合函数HLL_UNION
。CREATE TABLE `hll_uv` (
`page_id` INT NOT NULL COMMENT 'page ID',
`visit_date` datetime NOT NULL COMMENT 'access time',
`visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
) ENGINE=OLAP
AGGREGATE KEY(`page_id`, `visit_date`)
DISTRIBUTED BY HASH(`page_id`); -
创建一个 Spark 表。
Spark 表的模式是从 StarRocks 表推断出来的,并且 Spark 不支持
HLL
类型。因此,您需要在 Spark 中自定义相应的列数据类型,例如BIGINT
,通过配置选项"starrocks.column.types"="visit_users BIGINT"
。当使用 Stream Load 摄取数据时,连接器使用hll_hash
函数将BIGINT
类型的数据转换为HLL
类型。在
spark-sql
中运行以下 DDLCREATE TABLE `hll_uv`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.hll_uv",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.column.types"="visit_users BIGINT"
); -
将数据加载到 StarRocks 表中。
在
spark-sql
中运行以下 DMLINSERT INTO `hll_uv` VALUES
(3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
(4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
(3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674); -
从 StarRocks 表中计算页面 UV。
MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;
+---------+-----------------------------+
| page_id | count(DISTINCT visit_users) |
+---------+-----------------------------+
| 4 | 1 |
| 3 | 2 |
+---------+-----------------------------+
2 rows in set (0.01 sec)
将数据加载到 ARRAY 类型的列中
以下示例说明了如何将数据加载到 ARRAY
类型的列中。
-
创建一个 StarRocks 表。
在数据库
test
中,创建一个 Primary Key 表array_tbl
,其中包含一个INT
列和两个ARRAY
列。CREATE TABLE `array_tbl` (
`id` INT NOT NULL,
`a0` ARRAY<STRING>,
`a1` ARRAY<ARRAY<INT>>
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
; -
将数据写入 StarRocks。
由于某些 StarRocks 版本不提供
ARRAY
列的元数据,因此连接器无法推断此列的相应 Spark 数据类型。但是,您可以在选项starrocks.column.types
中显式指定该列的相应 Spark 数据类型。在本示例中,您可以将该选项配置为a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>
。在
spark-shell
中运行以下代码val data = Seq(
| (1, Seq("hello", "starrocks"), Seq(Seq(1, 2), Seq(3, 4))),
| (2, Seq("hello", "spark"), Seq(Seq(5, 6, 7), Seq(8, 9, 10)))
| )
val df = data.toDF("id", "a0", "a1")
df.write
.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.array_tbl")
.option("starrocks.user", "root")
.option("starrocks.password", "")
.option("starrocks.column.types", "a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>")
.mode("append")
.save() -
查询 StarRocks 表中的数据。
MySQL [test]> SELECT * FROM `array_tbl`;
+------+-----------------------+--------------------+
| id | a0 | a1 |
+------+-----------------------+--------------------+
| 1 | ["hello","starrocks"] | [[1,2],[3,4]] |
| 2 | ["hello","spark"] | [[5,6,7],[8,9,10]] |
+------+-----------------------+--------------------+
2 rows in set (0.01 sec)