Apache Spark Load
使用 Spark Load 批量加载数据
这种导入方式使用外部 Apache Spark™ 资源预处理导入数据,从而提高导入性能并节省计算资源。它主要用于 初始迁移 和 大数据导入 到 StarRocks(数据量高达 TB 级别)。
Spark Load 是一种 异步 导入方法,需要用户通过 MySQL 协议创建 Spark 类型的导入作业,并使用 SHOW LOAD
查看导入结果。
注意
- 只有对 StarRocks 表具有 INSERT 权限的用户才能将数据加载到该表中。您可以按照 GRANT 中提供的说明授予所需的权限。
- Spark Load 不能用于将数据加载到主键表中。
术语解释
- Spark ETL:主要负责导入过程中的数据 ETL,包括全局字典构建(BITMAP 类型)、分区、排序、聚合等。
- Broker:Broker 是一个独立的无状态进程。它封装了文件系统接口,并为 StarRocks 提供了从远程存储系统读取文件的能力。
- 全局字典:保存将原始值映射到编码值的数据结构。原始值可以是任何数据类型,而编码值是一个整数。全局字典主要用于精确计数 distinct 预计算的场景。
原理
用户通过 MySQL 客户端提交 Spark 类型导入作业;FE 记录元数据并返回提交结果。
spark load 任务的执行分为以下主要阶段。
- 用户将 spark load 作业提交到 FE。
- FE 调度 ETL 任务的提交到 Apache Spark™ 集群以供执行。
- Apache Spark™ 集群执行 ETL 任务,包括全局字典构建(BITMAP 类型)、分区、排序、聚合等。
- ETL 任务完成后,FE 获取每个预处理切片的数据路径,并调度相关 BE 执行 Push 任务。
- BE 通过 Broker 进程从 HDFS 读取数据,并将其转换为 StarRocks 存储格式。
如果您选择不使用 Broker 进程,BE 将直接从 HDFS 读取数据。
- FE 调度有效版本并完成导入作业。
下图说明了 spark load 的主要流程。
全局字典
适用场景
目前,StarRocks 中的 BITMAP 列是使用 Roaringbitmap 实现的,它只有整数作为输入数据类型。因此,如果您想在导入过程中实现 BITMAP 列的预计算,则需要将输入数据类型转换为整数。
在 StarRocks 的现有导入过程中,全局字典的数据结构是基于 Hive 表实现的,它保存了从原始值到编码值的映射。
构建过程
- 从上游数据源读取数据并生成一个临时 Hive 表,命名为
hive-table
。 - 提取
hive-table
的去重字段的值以生成一个新的 Hive 表,命名为distinct-value-table
。 - 创建一个新的全局字典表,命名为
dict-table
,其中一列用于原始值,一列用于编码值。 - 在
distinct-value-table
和dict-table
之间进行左连接,然后使用窗口函数对该集合进行编码。最后,去重列的原始值和编码值都写回到dict-table
。 - 在
dict-table
和hive-table
之间进行连接,以完成将hive-table
中的原始值替换为整数编码值的工作。 hive-table
将在下次数据预处理时被读取,然后在计算后导入到 StarRocks。
数据预处理
数据预处理的基本过程如下
- 从上游数据源(HDFS 文件或 Hive 表)读取数据。
- 完成读取数据的字段映射和计算,然后根据分区信息生成
bucket-id
。 - 基于 StarRocks 表的 Rollup 元数据生成 RollupTree。
- 迭代 RollupTree 并执行分层聚合操作。下一层级的 Rollup 可以从上一层级的 Rollup 计算出来。
- 每次完成聚合计算时,都会根据
bucket-id
对数据进行分桶,然后写入 HDFS。 - 随后的 Broker 进程将从 HDFS 拉取文件并将其导入到 StarRocks BE 节点。
基本操作
配置 ETL 集群
Apache Spark™ 在 StarRocks 中用作 ETL 工作的外部计算资源。可能还有其他外部资源添加到 StarRocks,例如用于查询的 Spark/GPU,用于外部存储的 HDFS/S3,用于 ETL 的 MapReduce 等。因此,我们引入 Resource Management
来管理 StarRocks 使用的这些外部资源。
在提交 Apache Spark™ 导入作业之前,请配置 Apache Spark™ 集群以执行 ETL 任务。操作语法如下
-- create Apache Spark™ resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
type = spark,
spark_conf_key = spark_conf_value,
working_dir = path,
broker = broker_name,
broker.property_key = property_value
);
-- drop Apache Spark™ resource
DROP RESOURCE resource_name;
-- show resources
SHOW RESOURCES
SHOW PROC "/resources";
-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identityGRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name;
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identityREVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name;
- 创建资源
例如:
-- yarn cluster mode
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/starrocks",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);
-- yarn HA cluster mode
CREATE EXTERNAL RESOURCE "spark1"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1",
"spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/starrocks",
"broker" = "broker1"
);
resource-name
是在 StarRocks 中配置的 Apache Spark™ 资源的名称。
PROPERTIES
包括与 Apache Spark™ 资源相关的参数,如下所示
注意
有关 Apache Spark™ 资源 PROPERTIES 的详细描述,请参阅 CREATE RESOURCE
-
Spark 相关参数
type
:资源类型,必填,目前仅支持spark
。spark.master
:必填,目前仅支持yarn
。spark.submit.deployMode
:Apache Spark™ 程序的部署模式,必填,目前支持cluster
和client
。spark.hadoop.fs.defaultFS
:如果 master 是 yarn 则为必填。- 与 yarn 资源管理器相关的参数,必填。
- 单节点上的一个 ResourceManager
spark.hadoop.yarn.resourcemanager.address
:单点资源管理器的地址。 - ResourceManager HA
您可以选择指定 ResourceManager 的主机名或地址。
spark.hadoop.yarn.resourcemanager.ha.enabled
:启用资源管理器 HA,设置为true
。spark.hadoop.yarn.resourcemanager.ha.rm-ids
:资源管理器逻辑 id 列表。spark.hadoop.yarn.resourcemanager.hostname.rm-id
:对于每个 rm-id,指定与资源管理器对应的主机名。spark.hadoop.yarn.resourcemanager.address.rm-id
:对于每个 rm-id,指定客户端提交作业的host:port
。
- 单节点上的一个 ResourceManager
-
*working_dir
:ETL 使用的目录。如果 Apache Spark™ 用作 ETL 资源,则为必填项。例如:hdfs://host:port/tmp/starrocks
。 -
Broker 相关参数
broker
:Broker 名称。如果 Apache Spark™ 用作 ETL 资源,则为必填项。您需要使用ALTER SYSTEM ADD BROKER
命令提前完成配置。broker.property_key
:Broker 进程读取 ETL 生成的中间文件时要指定的信息(例如,身份验证信息)。
注意事项:
以上是通过 Broker 进程加载的参数说明。如果您打算不通过 Broker 进程加载数据,则应注意以下几点。
- 您不需要指定
broker
。 - 如果您需要配置用户身份验证和 NameNode 节点的 HA,则需要在 HDFS 集群中的 hdfs-site.xml 文件中配置参数,有关参数说明,请参阅 broker_properties。并且您需要将 hdfs-site.xml 文件移动到每个 FE 的 $FE_HOME/conf 下和每个 BE 的 $BE_HOME/conf 下。
注意
如果 HDFS 文件只能由特定用户访问,您仍然需要在
broker.name
中指定 HDFS 用户名,并在broker.password
中指定用户密码。
- 查看资源
普通帐户只能查看他们具有 USAGE-PRIV
访问权限的资源。root 和 admin 帐户可以查看所有资源。
- 资源权限
资源权限通过 GRANT REVOKE
进行管理,目前仅支持 USAGE-PRIV
权限。您可以将 USAGE-PRIV
权限授予用户或角色。
-- Grant access to spark0 resources to user0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
-- Grant access to spark0 resources to role0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
-- Grant access to all resources to user0
GRANT USAGE_PRIV ON RESOURCE* TO "user0"@"%";
-- Grant access to all resources to role0
GRANT USAGE_PRIV ON RESOURCE* TO ROLE "role0";
-- Revoke the use privileges of spark0 resources from user user0
REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";
配置 Spark Client
为 FE 配置 Spark 客户端,以便后者可以通过执行 spark-submit
命令提交 Spark 任务。建议使用官方版本的 Spark2 2.4.5 或更高版本 spark 下载地址。下载后,请使用以下步骤完成配置。
- 配置
SPARK-HOME
将 Spark 客户端放置在与 FE 相同的机器上的一个目录中,并在 FE 配置文件中将 spark_home_default_dir
配置到此目录,默认情况下是 FE 根目录中的 lib/spark2x
路径,不能为空。
- 配置 SPARK 依赖包
要配置依赖包,请压缩并归档 Spark 客户端下的 jars 文件夹中的所有 jar 文件,并在 FE 配置中将 spark_resource_path
项配置到此 zip 文件。如果此配置为空,FE 将尝试在 FE 根目录中查找 lib/spark2x/jars/spark-2x.zip
文件。如果 FE 找不到它,它将报告错误。
提交 spark load 作业时,归档的依赖文件将被上传到远程存储库。默认存储库路径位于 working_dir/{cluster_id}
目录下,命名为 --spark-repository--{resource-name}
,这意味着集群中的一个资源对应于一个远程存储库。目录结构参考如下
---spark-repository--spark0/
|---archive-1.0.0/
| |\---lib-990325d2c0d1d5e45bf675e54e44fb16-spark-dpp-1.0.0\-jar-with-dependencies.jar
| |\---lib-7670c29daf535efe3c9b923f778f61fc-spark-2x.zip
|---archive-1.1.0/
| |\---lib-64d5696f99c379af2bee28c1c84271d5-spark-dpp-1.1.0\-jar-with-dependencies.jar
| |\---lib-1bbb74bb6b264a270bc7fca3e964160f-spark-2x.zip
|---archive-1.2.0/
| |-...
除了 spark 依赖项(默认命名为 spark-2x.zip
)之外,FE 还会将 DPP 依赖项上传到远程存储库。如果 spark load 提交的所有依赖项已经存在于远程存储库中,则无需再次上传依赖项,从而节省了每次重复上传大量文件的时间。
配置 YARN Client
为 FE 配置 yarn 客户端,以便 FE 可以执行 yarn 命令来获取正在运行的应用程序的状态或终止它。建议使用官方版本的 Hadoop2 2.5.2 或更高版本(hadoop 下载地址)。下载后,请使用以下步骤完成配置
- 配置 YARN 可执行路径
将下载的 yarn 客户端放置在与 FE 相同的机器上的一个目录中,并在 FE 配置文件中将 yarn_client_path
项配置为 yarn 的二进制可执行文件,默认情况下是 FE 根目录中的 lib/yarn-client/hadoop/bin/yarn
路径。
- 配置生成 YARN 所需的配置文件的路径(可选)
当 FE 通过 yarn 客户端获取应用程序的状态或终止应用程序时,默认情况下,StarRocks 会在 FE 根目录的 lib/yarn-config
路径下生成执行 yarn 命令所需的配置文件。此路径可以通过配置 FE 配置文件中的 yarn_config_dir
条目进行修改,目前包括 core-site.xml
和 yarn-site.xml
。
创建导入作业
语法
LOAD LABEL load_label
(data_desc, ...)
WITH RESOURCE resource_name
[resource_properties]
[PROPERTIES (key1=value1, ... )]
* load_label:
db_name.label_name
* data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY separator ]
[(col1, ...)]
[COLUMNS FROM PATH AS (col2, ...)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
DATA FROM TABLE hive_external_tbl
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
* resource_properties:
(key2=value2, ...)
示例 1:上游数据源是 HDFS 的情况
LOAD LABEL db1.label1
(
DATA INFILE("hdfs://abc.com:8888/user/starrocks/test/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1,tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
),
DATA INFILE("hdfs://abc.com:8888/user/starrocks/test/ml/file2")
INTO TABLE tbl2
COLUMNS TERMINATED BY ","
(col1, col2)
where col1 > 1
)
WITH RESOURCE 'spark0'
(
"spark.executor.memory" = "2g",
"spark.shuffle.compress" = "true"
)
PROPERTIES
(
"timeout" = "3600"
);
示例 2:上游数据源是 Hive 的情况。
- 步骤 1:创建一个新的 hive 资源
CREATE EXTERNAL RESOURCE hive0
PROPERTIES
(
"type" = "hive",
"hive.metastore.uris" = "thrift://xx.xx.xx.xx:8080"
);
- 步骤 2:创建一个新的 hive 外部表
CREATE EXTERNAL TABLE hive_t1
(
k1 INT,
K2 SMALLINT,
k3 varchar(50),
uuid varchar(100)
)
ENGINE=hive
PROPERTIES
(
"resource" = "hive0",
"database" = "tmp",
"table" = "t1"
);
- 步骤 3:提交加载命令,要求导入的 StarRocks 表中的列存在于 hive 外部表中。
LOAD LABEL db1.label1
(
DATA FROM TABLE hive_t1
INTO TABLE tbl1
SET
(
uuid=bitmap_dict(uuid)
)
)
WITH RESOURCE 'spark0'
(
"spark.executor.memory" = "2g",
"spark.shuffle.compress" = "true"
)
PROPERTIES
(
"timeout" = "3600"
);
Spark load 中的参数介绍
- Label
导入作业的 Label。每个导入作业都有一个 Label,该 Label 在数据库中是唯一的,遵循与 broker load 相同的规则。
- 数据描述类参数
目前,支持的数据源是 CSV 和 Hive 表。其他规则与 broker load 相同。
- 导入作业参数
导入作业参数是指属于导入语句的 opt_properties
部分的参数。这些参数适用于整个导入作业。规则与 broker load 相同。
- Spark 资源参数
Spark 资源需要提前配置到 StarRocks 中,并且用户需要被授予 USAGE-PRIV 权限才能将资源应用于 Spark load。Spark 资源参数可以在用户有临时需要时设置,例如为作业添加资源和修改 Spark 配置。该设置仅对该作业生效,不影响 StarRocks 集群中的现有配置。
WITH RESOURCE 'spark0'
(
"spark.driver.memory" = "1g",
"spark.executor.memory" = "3g"
)
- 当数据源是 Hive 时导入
目前,要在导入过程中使用 Hive 表,您需要创建一个 Hive
类型的外部表,然后在提交导入命令时指定其名称。
- 构建全局字典的导入过程
在加载命令中,您可以按以下格式指定构建全局字典所需的字段:StarRocks 字段名=bitmap_dict(hive 表字段名)
请注意,目前 仅当上游数据源是 Hive 表时才支持全局字典。
- 加载二进制类型数据
从 v2.5.17 开始,Spark Load 支持 bitmap_from_binary 函数,它可以将二进制数据转换为 bitmap 数据。如果 Hive 表或 HDFS 文件的列类型是二进制,并且 StarRocks 表中的相应列是 bitmap 类型的聚合列,则可以在加载命令中按以下格式指定字段,StarRocks 字段名=bitmap_from_binary(Hive 表字段名)
。这样就不需要构建全局字典。
查看导入作业
Spark load 导入是异步的,broker load 也是如此。用户必须记录导入作业的 label,并在 SHOW LOAD
命令中使用它来查看导入结果。查看导入的命令对于所有导入方法都是通用的。示例如下。
有关返回参数的详细说明,请参阅 Broker Load。不同之处如下。
mysql> show load order by createtime desc limit 1\G
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: SPARK
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:49:44
LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://1.1.1.1:8089/proxy/application_1586619723848_0035/
JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
- 状态
导入作业的当前阶段。PENDING:已提交作业。ETL:已提交 Spark ETL。LOADING:FE 调度 BE 执行 push 操作。FINISHED:push 已完成且版本有效。
导入作业有两个最终阶段 – CANCELLED
和 FINISHED
,都表示加载作业已完成。CANCELLED
表示导入失败,FINISHED
表示导入成功。
- 进度
导入作业进度的说明。有两种类型的进度 – ETL 和 LOAD,它们对应于导入过程的两个阶段,ETL 和 LOADING。
- LOAD 的进度范围是 0~100%。
LOAD 进度 = 当前已完成的所有副本导入的 tablet 数 / 此导入作业的 tablet 总数 * 100%
.
-
如果所有表都已导入,则 LOAD 进度为 99%,当导入进入最终验证阶段时,将更改为 100%。
-
导入进度不是线性的。如果一段时间内进度没有变化,并不意味着导入没有执行。
-
类型
导入作业的类型。SPARK 用于 spark load。
- CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime
这些值表示创建导入的时间、ETL 阶段开始的时间、ETL 阶段完成的时间、LOADING 阶段开始的时间以及整个导入作业完成的时间。
- JobDetails
显示作业的详细运行状态,包括导入的文件数、总大小(以字节为单位)、子任务数、正在处理的原始行数等。例如
{"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}
- URL
您可以将输入复制到浏览器以访问相应应用程序的 Web 界面。
查看 Apache Spark™ Launcher 提交日志
有时用户需要查看在 Apache Spark™ 作业提交期间生成的详细日志。默认情况下,日志保存在 FE 根目录中的 log/spark_launcher_log
路径下,命名为 spark-launcher-{load-job-id}-{label}.log
。日志在此目录中保存一段时间,并在清除 FE 元数据中的导入信息时被擦除。默认保留时间为 3 天。
取消导入
当 Spark load 作业状态不是 CANCELLED
或 FINISHED
时,用户可以通过指定导入作业的 Label 手动取消它。
相关系统配置
FE 配置: 以下配置是 Spark load 的系统级配置,适用于所有 Spark load 导入作业。配置值主要可以通过修改 fe.conf
来调整。
- enable-spark-load:启用 Spark load 和资源创建,默认值为 false。
- spark-load-default-timeout-second:作业的默认超时时间为 259200 秒(3 天)。
- spark-home-default-dir:Spark 客户端路径 (
fe/lib/spark2x
)。 - spark-resource-path:打包的 S park 依赖文件路径(默认为空)。
- spark-launcher-log-dir:Spark 客户端的提交日志存储的目录 (
fe/log/spark-launcher-log
)。 - yarn-client-path:yarn 二进制可执行文件的路径 (
fe/lib/yarn-client/hadoop/bin/yarn
)。 - yarn-config-dir:Yarn 的配置文件路径 (
fe/lib/yarn-config
)。
最佳实践
使用 Spark load 最合适的场景是原始数据在文件系统 (HDFS) 中,并且数据量在数十 GB 到 TB 级别。对于较小的数据量,请使用 Stream Load 或 Broker Load。
有关完整的 spark load 导入示例,请参阅 github 上的演示:https://github.com/StarRocks/demo/blob/master/docs/03_sparkLoad2StarRocks.md
常见问题解答
错误:当使用 master 'yarn' 运行时,必须在环境中设置 HADOOP-CONF-DIR 或 YARN-CONF-DIR。
使用 Spark Load 而未在 Spark 客户端的 spark-env.sh
中配置 HADOOP-CONF-DIR
环境变量。
错误:无法运行程序 "xxx/bin/spark-submit":error=2, 没有这样的文件或目录
使用 Spark Load 时,spark_home_default_dir
配置项未指定 Spark 客户端根目录。
错误:文件 xxx/jars/spark-2x.zip 不存在。
使用 Spark load 时,spark-resource-path
配置项未指向打包的 zip 文件。
错误:yarn 客户端在路径中不存在:xxx/yarn-client/hadoop/bin/yarn
使用 Spark load 时,yarn-client-path 配置项未指定 yarn 可执行文件。
错误:无法执行 hadoop-yarn/bin/... /libexec/yarn-config.sh
当使用带有 CDH 的 Hadoop 时,您需要配置 HADOOP_LIBEXEC_DIR
环境变量。由于 hadoop-yarn
和 hadoop 目录不同,默认 libexec
目录将查找 hadoop-yarn/bin/... /libexec
,而 libexec
位于 hadoop 目录中。获取 Spark 任务状态的 ```yarn application status`` 命令报告错误导致导入作业失败。