跳到主要内容
版本: 最新版本-3.5

Apache Spark Load

使用 Spark Load 批量加载数据

这种导入方式使用外部 Apache Spark™ 资源预处理导入数据,从而提高导入性能并节省计算资源。它主要用于 初始迁移大数据导入 到 StarRocks(数据量高达 TB 级别)。

Spark Load 是一种 异步 导入方法,需要用户通过 MySQL 协议创建 Spark 类型的导入作业,并使用 SHOW LOAD 查看导入结果。

注意

  • 只有对 StarRocks 表具有 INSERT 权限的用户才能将数据加载到该表中。您可以按照 GRANT 中提供的说明授予所需的权限。
  • Spark Load 不能用于将数据加载到主键表中。

术语解释

  • Spark ETL:主要负责导入过程中的数据 ETL,包括全局字典构建(BITMAP 类型)、分区、排序、聚合等。
  • Broker:Broker 是一个独立的无状态进程。它封装了文件系统接口,并为 StarRocks 提供了从远程存储系统读取文件的能力。
  • 全局字典:保存将原始值映射到编码值的数据结构。原始值可以是任何数据类型,而编码值是一个整数。全局字典主要用于精确计数 distinct 预计算的场景。

原理

用户通过 MySQL 客户端提交 Spark 类型导入作业;FE 记录元数据并返回提交结果。

spark load 任务的执行分为以下主要阶段。

  1. 用户将 spark load 作业提交到 FE。
  2. FE 调度 ETL 任务的提交到 Apache Spark™ 集群以供执行。
  3. Apache Spark™ 集群执行 ETL 任务,包括全局字典构建(BITMAP 类型)、分区、排序、聚合等。
  4. ETL 任务完成后,FE 获取每个预处理切片的数据路径,并调度相关 BE 执行 Push 任务。
  5. BE 通过 Broker 进程从 HDFS 读取数据,并将其转换为 StarRocks 存储格式。

    如果您选择不使用 Broker 进程,BE 将直接从 HDFS 读取数据。

  6. FE 调度有效版本并完成导入作业。

下图说明了 spark load 的主要流程。

Spark load


全局字典

适用场景

目前,StarRocks 中的 BITMAP 列是使用 Roaringbitmap 实现的,它只有整数作为输入数据类型。因此,如果您想在导入过程中实现 BITMAP 列的预计算,则需要将输入数据类型转换为整数。

在 StarRocks 的现有导入过程中,全局字典的数据结构是基于 Hive 表实现的,它保存了从原始值到编码值的映射。

构建过程

  1. 从上游数据源读取数据并生成一个临时 Hive 表,命名为 hive-table
  2. 提取 hive-table 的去重字段的值以生成一个新的 Hive 表,命名为 distinct-value-table
  3. 创建一个新的全局字典表,命名为 dict-table,其中一列用于原始值,一列用于编码值。
  4. distinct-value-tabledict-table 之间进行左连接,然后使用窗口函数对该集合进行编码。最后,去重列的原始值和编码值都写回到 dict-table
  5. dict-tablehive-table 之间进行连接,以完成将 hive-table 中的原始值替换为整数编码值的工作。
  6. hive-table 将在下次数据预处理时被读取,然后在计算后导入到 StarRocks。

数据预处理

数据预处理的基本过程如下

  1. 从上游数据源(HDFS 文件或 Hive 表)读取数据。
  2. 完成读取数据的字段映射和计算,然后根据分区信息生成 bucket-id
  3. 基于 StarRocks 表的 Rollup 元数据生成 RollupTree。
  4. 迭代 RollupTree 并执行分层聚合操作。下一层级的 Rollup 可以从上一层级的 Rollup 计算出来。
  5. 每次完成聚合计算时,都会根据 bucket-id 对数据进行分桶,然后写入 HDFS。
  6. 随后的 Broker 进程将从 HDFS 拉取文件并将其导入到 StarRocks BE 节点。

基本操作

配置 ETL 集群

Apache Spark™ 在 StarRocks 中用作 ETL 工作的外部计算资源。可能还有其他外部资源添加到 StarRocks,例如用于查询的 Spark/GPU,用于外部存储的 HDFS/S3,用于 ETL 的 MapReduce 等。因此,我们引入 Resource Management 来管理 StarRocks 使用的这些外部资源。

在提交 Apache Spark™ 导入作业之前,请配置 Apache Spark™ 集群以执行 ETL 任务。操作语法如下

-- create Apache Spark™ resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
type = spark,
spark_conf_key = spark_conf_value,
working_dir = path,
broker = broker_name,
broker.property_key = property_value
);

-- drop Apache Spark™ resource
DROP RESOURCE resource_name;

-- show resources
SHOW RESOURCES
SHOW PROC "/resources";

-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identityGRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name;
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identityREVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name;
  • 创建资源

例如:

-- yarn cluster mode
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/starrocks",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);

-- yarn HA cluster mode
CREATE EXTERNAL RESOURCE "spark1"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1",
"spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/starrocks",
"broker" = "broker1"
);

resource-name 是在 StarRocks 中配置的 Apache Spark™ 资源的名称。

PROPERTIES 包括与 Apache Spark™ 资源相关的参数,如下所示

注意

有关 Apache Spark™ 资源 PROPERTIES 的详细描述,请参阅 CREATE RESOURCE

  • Spark 相关参数

    • type:资源类型,必填,目前仅支持 spark
    • spark.master:必填,目前仅支持 yarn
      • spark.submit.deployMode:Apache Spark™ 程序的部署模式,必填,目前支持 clusterclient
      • spark.hadoop.fs.defaultFS:如果 master 是 yarn 则为必填。
      • 与 yarn 资源管理器相关的参数,必填。
        • 单节点上的一个 ResourceManager spark.hadoop.yarn.resourcemanager.address:单点资源管理器的地址。
        • ResourceManager HA

          您可以选择指定 ResourceManager 的主机名或地址。

          • spark.hadoop.yarn.resourcemanager.ha.enabled:启用资源管理器 HA,设置为 true
          • spark.hadoop.yarn.resourcemanager.ha.rm-ids:资源管理器逻辑 id 列表。
          • spark.hadoop.yarn.resourcemanager.hostname.rm-id:对于每个 rm-id,指定与资源管理器对应的主机名。
          • spark.hadoop.yarn.resourcemanager.address.rm-id:对于每个 rm-id,指定客户端提交作业的 host:port
  • *working_dir:ETL 使用的目录。如果 Apache Spark™ 用作 ETL 资源,则为必填项。例如:hdfs://host:port/tmp/starrocks

  • Broker 相关参数

    • broker:Broker 名称。如果 Apache Spark™ 用作 ETL 资源,则为必填项。您需要使用 ALTER SYSTEM ADD BROKER 命令提前完成配置。
    • broker.property_key:Broker 进程读取 ETL 生成的中间文件时要指定的信息(例如,身份验证信息)。

注意事项:

以上是通过 Broker 进程加载的参数说明。如果您打算不通过 Broker 进程加载数据,则应注意以下几点。

  • 您不需要指定 broker
  • 如果您需要配置用户身份验证和 NameNode 节点的 HA,则需要在 HDFS 集群中的 hdfs-site.xml 文件中配置参数,有关参数说明,请参阅 broker_properties。并且您需要将 hdfs-site.xml 文件移动到每个 FE 的 $FE_HOME/conf 下和每个 BE 的 $BE_HOME/conf 下。

注意

如果 HDFS 文件只能由特定用户访问,您仍然需要在 broker.name 中指定 HDFS 用户名,并在 broker.password 中指定用户密码。

  • 查看资源

普通帐户只能查看他们具有 USAGE-PRIV 访问权限的资源。root 和 admin 帐户可以查看所有资源。

  • 资源权限

资源权限通过 GRANT REVOKE 进行管理,目前仅支持 USAGE-PRIV 权限。您可以将 USAGE-PRIV 权限授予用户或角色。

-- Grant access to spark0 resources to user0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";

-- Grant access to spark0 resources to role0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";

-- Grant access to all resources to user0
GRANT USAGE_PRIV ON RESOURCE* TO "user0"@"%";

-- Grant access to all resources to role0
GRANT USAGE_PRIV ON RESOURCE* TO ROLE "role0";

-- Revoke the use privileges of spark0 resources from user user0
REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";

配置 Spark Client

为 FE 配置 Spark 客户端,以便后者可以通过执行 spark-submit 命令提交 Spark 任务。建议使用官方版本的 Spark2 2.4.5 或更高版本 spark 下载地址。下载后,请使用以下步骤完成配置。

  • 配置 SPARK-HOME

将 Spark 客户端放置在与 FE 相同的机器上的一个目录中,并在 FE 配置文件中将 spark_home_default_dir 配置到此目录,默认情况下是 FE 根目录中的 lib/spark2x 路径,不能为空。

  • 配置 SPARK 依赖包

要配置依赖包,请压缩并归档 Spark 客户端下的 jars 文件夹中的所有 jar 文件,并在 FE 配置中将 spark_resource_path 项配置到此 zip 文件。如果此配置为空,FE 将尝试在 FE 根目录中查找 lib/spark2x/jars/spark-2x.zip 文件。如果 FE 找不到它,它将报告错误。

提交 spark load 作业时,归档的依赖文件将被上传到远程存储库。默认存储库路径位于 working_dir/{cluster_id} 目录下,命名为 --spark-repository--{resource-name},这意味着集群中的一个资源对应于一个远程存储库。目录结构参考如下

---spark-repository--spark0/

|---archive-1.0.0/

| |\---lib-990325d2c0d1d5e45bf675e54e44fb16-spark-dpp-1.0.0\-jar-with-dependencies.jar

| |\---lib-7670c29daf535efe3c9b923f778f61fc-spark-2x.zip

|---archive-1.1.0/

| |\---lib-64d5696f99c379af2bee28c1c84271d5-spark-dpp-1.1.0\-jar-with-dependencies.jar

| |\---lib-1bbb74bb6b264a270bc7fca3e964160f-spark-2x.zip

|---archive-1.2.0/

| |-...

除了 spark 依赖项(默认命名为 spark-2x.zip)之外,FE 还会将 DPP 依赖项上传到远程存储库。如果 spark load 提交的所有依赖项已经存在于远程存储库中,则无需再次上传依赖项,从而节省了每次重复上传大量文件的时间。

配置 YARN Client

为 FE 配置 yarn 客户端,以便 FE 可以执行 yarn 命令来获取正在运行的应用程序的状态或终止它。建议使用官方版本的 Hadoop2 2.5.2 或更高版本(hadoop 下载地址)。下载后,请使用以下步骤完成配置

  • 配置 YARN 可执行路径

将下载的 yarn 客户端放置在与 FE 相同的机器上的一个目录中,并在 FE 配置文件中将 yarn_client_path 项配置为 yarn 的二进制可执行文件,默认情况下是 FE 根目录中的 lib/yarn-client/hadoop/bin/yarn 路径。

  • 配置生成 YARN 所需的配置文件的路径(可选)

当 FE 通过 yarn 客户端获取应用程序的状态或终止应用程序时,默认情况下,StarRocks 会在 FE 根目录的 lib/yarn-config 路径下生成执行 yarn 命令所需的配置文件。此路径可以通过配置 FE 配置文件中的 yarn_config_dir 条目进行修改,目前包括 core-site.xmlyarn-site.xml

创建导入作业

语法

LOAD LABEL load_label
(data_desc, ...)
WITH RESOURCE resource_name
[resource_properties]
[PROPERTIES (key1=value1, ... )]

* load_label:
db_name.label_name

* data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY separator ]
[(col1, ...)]
[COLUMNS FROM PATH AS (col2, ...)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]

DATA FROM TABLE hive_external_tbl
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]

* resource_properties:
(key2=value2, ...)

示例 1:上游数据源是 HDFS 的情况

LOAD LABEL db1.label1
(
DATA INFILE("hdfs://abc.com:8888/user/starrocks/test/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1,tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
),
DATA INFILE("hdfs://abc.com:8888/user/starrocks/test/ml/file2")
INTO TABLE tbl2
COLUMNS TERMINATED BY ","
(col1, col2)
where col1 > 1
)
WITH RESOURCE 'spark0'
(
"spark.executor.memory" = "2g",
"spark.shuffle.compress" = "true"
)
PROPERTIES
(
"timeout" = "3600"
);

示例 2:上游数据源是 Hive 的情况。

  • 步骤 1:创建一个新的 hive 资源
CREATE EXTERNAL RESOURCE hive0
PROPERTIES
(
"type" = "hive",
"hive.metastore.uris" = "thrift://xx.xx.xx.xx:8080"
);
  • 步骤 2:创建一个新的 hive 外部表
CREATE EXTERNAL TABLE hive_t1
(
k1 INT,
K2 SMALLINT,
k3 varchar(50),
uuid varchar(100)
)
ENGINE=hive
PROPERTIES
(
"resource" = "hive0",
"database" = "tmp",
"table" = "t1"
);
  • 步骤 3:提交加载命令,要求导入的 StarRocks 表中的列存在于 hive 外部表中。
LOAD LABEL db1.label1
(
DATA FROM TABLE hive_t1
INTO TABLE tbl1
SET
(
uuid=bitmap_dict(uuid)
)
)
WITH RESOURCE 'spark0'
(
"spark.executor.memory" = "2g",
"spark.shuffle.compress" = "true"
)
PROPERTIES
(
"timeout" = "3600"
);

Spark load 中的参数介绍

  • Label

导入作业的 Label。每个导入作业都有一个 Label,该 Label 在数据库中是唯一的,遵循与 broker load 相同的规则。

  • 数据描述类参数

目前,支持的数据源是 CSV 和 Hive 表。其他规则与 broker load 相同。

  • 导入作业参数

导入作业参数是指属于导入语句的 opt_properties 部分的参数。这些参数适用于整个导入作业。规则与 broker load 相同。

  • Spark 资源参数

Spark 资源需要提前配置到 StarRocks 中,并且用户需要被授予 USAGE-PRIV 权限才能将资源应用于 Spark load。Spark 资源参数可以在用户有临时需要时设置,例如为作业添加资源和修改 Spark 配置。该设置仅对该作业生效,不影响 StarRocks 集群中的现有配置。

WITH RESOURCE 'spark0'
(
"spark.driver.memory" = "1g",
"spark.executor.memory" = "3g"
)
  • 当数据源是 Hive 时导入

目前,要在导入过程中使用 Hive 表,您需要创建一个 Hive 类型的外部表,然后在提交导入命令时指定其名称。

  • 构建全局字典的导入过程

在加载命令中,您可以按以下格式指定构建全局字典所需的字段:StarRocks 字段名=bitmap_dict(hive 表字段名) 请注意,目前 仅当上游数据源是 Hive 表时才支持全局字典

  • 加载二进制类型数据

从 v2.5.17 开始,Spark Load 支持 bitmap_from_binary 函数,它可以将二进制数据转换为 bitmap 数据。如果 Hive 表或 HDFS 文件的列类型是二进制,并且 StarRocks 表中的相应列是 bitmap 类型的聚合列,则可以在加载命令中按以下格式指定字段,StarRocks 字段名=bitmap_from_binary(Hive 表字段名)。这样就不需要构建全局字典。

查看导入作业

Spark load 导入是异步的,broker load 也是如此。用户必须记录导入作业的 label,并在 SHOW LOAD 命令中使用它来查看导入结果。查看导入的命令对于所有导入方法都是通用的。示例如下。

有关返回参数的详细说明,请参阅 Broker Load。不同之处如下。

mysql> show load order by createtime desc limit 1\G
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: SPARK
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:49:44
LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://1.1.1.1:8089/proxy/application_1586619723848_0035/
JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
  • 状态

导入作业的当前阶段。PENDING:已提交作业。ETL:已提交 Spark ETL。LOADING:FE 调度 BE 执行 push 操作。FINISHED:push 已完成且版本有效。

导入作业有两个最终阶段 – CANCELLEDFINISHED,都表示加载作业已完成。CANCELLED 表示导入失败,FINISHED 表示导入成功。

  • 进度

导入作业进度的说明。有两种类型的进度 – ETL 和 LOAD,它们对应于导入过程的两个阶段,ETL 和 LOADING。

  • LOAD 的进度范围是 0~100%。

LOAD 进度 = 当前已完成的所有副本导入的 tablet 数 / 此导入作业的 tablet 总数 * 100%.

  • 如果所有表都已导入,则 LOAD 进度为 99%,当导入进入最终验证阶段时,将更改为 100%。

  • 导入进度不是线性的。如果一段时间内进度没有变化,并不意味着导入没有执行。

  • 类型

导入作业的类型。SPARK 用于 spark load。

  • CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime

这些值表示创建导入的时间、ETL 阶段开始的时间、ETL 阶段完成的时间、LOADING 阶段开始的时间以及整个导入作业完成的时间。

  • JobDetails

显示作业的详细运行状态,包括导入的文件数、总大小(以字节为单位)、子任务数、正在处理的原始行数等。例如

 {"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}
  • URL

您可以将输入复制到浏览器以访问相应应用程序的 Web 界面。

查看 Apache Spark™ Launcher 提交日志

有时用户需要查看在 Apache Spark™ 作业提交期间生成的详细日志。默认情况下,日志保存在 FE 根目录中的 log/spark_launcher_log 路径下,命名为 spark-launcher-{load-job-id}-{label}.log。日志在此目录中保存一段时间,并在清除 FE 元数据中的导入信息时被擦除。默认保留时间为 3 天。

取消导入

当 Spark load 作业状态不是 CANCELLEDFINISHED 时,用户可以通过指定导入作业的 Label 手动取消它。


FE 配置: 以下配置是 Spark load 的系统级配置,适用于所有 Spark load 导入作业。配置值主要可以通过修改 fe.conf 来调整。

  • enable-spark-load:启用 Spark load 和资源创建,默认值为 false。
  • spark-load-default-timeout-second:作业的默认超时时间为 259200 秒(3 天)。
  • spark-home-default-dir:Spark 客户端路径 (fe/lib/spark2x)。
  • spark-resource-path:打包的 S park 依赖文件路径(默认为空)。
  • spark-launcher-log-dir:Spark 客户端的提交日志存储的目录 (fe/log/spark-launcher-log)。
  • yarn-client-path:yarn 二进制可执行文件的路径 (fe/lib/yarn-client/hadoop/bin/yarn)。
  • yarn-config-dir:Yarn 的配置文件路径 (fe/lib/yarn-config)。

最佳实践

使用 Spark load 最合适的场景是原始数据在文件系统 (HDFS) 中,并且数据量在数十 GB 到 TB 级别。对于较小的数据量,请使用 Stream Load 或 Broker Load。

有关完整的 spark load 导入示例,请参阅 github 上的演示:https://github.com/StarRocks/demo/blob/master/docs/03_sparkLoad2StarRocks.md

常见问题解答

  • 错误:当使用 master 'yarn' 运行时,必须在环境中设置 HADOOP-CONF-DIR 或 YARN-CONF-DIR。

使用 Spark Load 而未在 Spark 客户端的 spark-env.sh 中配置 HADOOP-CONF-DIR 环境变量。

  • 错误:无法运行程序 "xxx/bin/spark-submit":error=2, 没有这样的文件或目录

使用 Spark Load 时,spark_home_default_dir 配置项未指定 Spark 客户端根目录。

  • 错误:文件 xxx/jars/spark-2x.zip 不存在。

使用 Spark load 时,spark-resource-path 配置项未指向打包的 zip 文件。

  • 错误:yarn 客户端在路径中不存在:xxx/yarn-client/hadoop/bin/yarn

使用 Spark load 时,yarn-client-path 配置项未指定 yarn 可执行文件。

  • 错误:无法执行 hadoop-yarn/bin/... /libexec/yarn-config.sh

当使用带有 CDH 的 Hadoop 时,您需要配置 HADOOP_LIBEXEC_DIR 环境变量。由于 hadoop-yarn 和 hadoop 目录不同,默认 libexec 目录将查找 hadoop-yarn/bin/... /libexec,而 libexec 位于 hadoop 目录中。获取 Spark 任务状态的 ```yarn application status`` 命令报告错误导致导入作业失败。