跳到主要内容
版本: 最新版本-3.5

从 HDFS 或云存储加载数据

StarRocks 提供了基于 MySQL 的 Broker Load 加载方法,可帮助您从 HDFS 或云存储中加载大量数据到 StarRocks。

Broker Load 以异步加载模式运行。提交加载作业后,StarRocks 会异步运行该作业。您需要使用 SHOW LOAD 语句或 curl 命令来检查作业结果。

Broker Load 支持单表加载和多表加载。您可以通过运行一个 Broker Load 作业将一个或多个数据文件加载到一个或多个目标表中。Broker Load 确保每个运行的用于加载多个数据文件的加载作业的事务原子性。原子性意味着在一个加载作业中加载多个数据文件必须全部成功或全部失败。永远不会发生某些数据文件的加载成功,而其他文件的加载失败的情况。

Broker Load 支持在数据加载时进行数据转换,并支持在数据加载期间通过 UPSERT 和 DELETE 操作进行数据更改。有关详细信息,请参阅 加载时转换数据通过加载更改数据

您只能以对这些 StarRocks 表具有 INSERT 权限的用户的身份将数据加载到 StarRocks 表中。如果您没有 INSERT 权限,请按照 GRANT 中提供的说明,将 INSERT 权限授予您用于连接到 StarRocks 集群的用户。语法为 GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}

背景信息

在 v2.4 及更早版本中,StarRocks 依赖 Broker 在 StarRocks 集群和外部存储系统之间建立连接,以运行 Broker Load 作业。因此,您需要在 load 语句中输入 WITH BROKER "<broker_name>" 以指定要使用的 Broker。这称为“基于 Broker 的加载”。Broker 是一种独立的、无状态的服务,与文件系统接口集成。通过 Broker,StarRocks 可以访问和读取存储在外部存储系统中的数据文件,并可以使用自己的计算资源来预处理和加载这些数据文件的数据。

从 v2.5 开始,StarRocks 在运行 Broker Load 作业时不再依赖 Broker 在 StarRocks 集群和外部存储系统之间建立连接。因此,您不再需要在 load 语句中指定 Broker,但仍需要保留 WITH BROKER 关键字。这称为“无 Broker 加载”。

当您的数据存储在 HDFS 中时,您可能会遇到无 Broker 加载不起作用的情况。当您的数据存储在多个 HDFS 集群中或配置了多个 Kerberos 用户时,可能会发生这种情况。在这些情况下,您可以改为使用基于 Broker 的加载。要成功执行此操作,请确保至少部署了一个独立的 Broker 组。有关如何在这些情况下指定身份验证配置和 HA 配置的信息,请参阅 HDFS

支持的数据文件格式

Broker Load 支持以下数据文件格式

  • CSV

  • Parquet

  • ORC

注意

对于 CSV 数据,请注意以下几点

  • 您可以使用 UTF-8 字符串(例如逗号 (,)、制表符或管道 (|))作为文本分隔符,其长度不超过 50 字节。
  • 空值用 \N 表示。例如,一个数据文件由三列组成,该数据文件中的一条记录在第一列和第三列中保存数据,但在第二列中没有数据。在这种情况下,您需要在第二列中使用 \N 来表示空值。这意味着记录必须编译为 a,\N,b 而不是 a,,ba,,b 表示记录的第二列包含一个空字符串。

支持的存储系统

Broker Load 支持以下存储系统

  • HDFS

  • AWS S3

  • Google GCS

  • 其他 S3 兼容的存储系统,例如 MinIO

  • Microsoft Azure Storage

工作原理

在您将加载作业提交到 FE 后,FE 会生成一个查询计划,根据可用的 BE 数量和要加载的数据文件的大小将查询计划拆分为多个部分,然后将查询计划的每个部分分配给可用的 BE。在加载期间,每个涉及的 BE 从您的 HDFS 或云存储系统中提取数据文件的数据,预处理数据,然后将数据加载到您的 StarRocks 集群中。在所有 BE 完成其查询计划部分后,FE 确定加载作业是否成功。

下图显示了 Broker Load 作业的工作流程。

Workflow of Broker Load

基本操作

创建多表加载作业

本主题以 CSV 为例,描述如何将多个数据文件加载到多个表中。有关如何加载其他文件格式的数据以及有关 Broker Load 的语法和参数描述的信息,请参阅 BROKER LOAD

请注意,在 StarRocks 中,某些文字被 SQL 语言用作保留关键字。不要在 SQL 语句中直接使用这些关键字。如果您想在 SQL 语句中使用此类关键字,请将其括在一对反引号 (`) 中。请参阅 关键字

数据示例

  1. 在本地文件系统中创建 CSV 文件。

    a. 创建一个名为 file1.csv 的 CSV 文件。该文件由三列组成,按顺序表示用户 ID、用户名和用户分数。

    1,Lily,23
    2,Rose,23
    3,Alice,24
    4,Julia,25

    b. 创建一个名为 file2.csv 的 CSV 文件。该文件由两列组成,按顺序表示城市 ID 和城市名称。

    200,'Beijing'
  2. 在 StarRocks 数据库 test_db 中创建 StarRocks 表。

    注意

    从 v2.5.7 开始,StarRocks 可以在创建表或添加分区时自动设置存储桶数 (BUCKETS)。您不再需要手动设置存储桶数。有关详细信息,请参阅 设置存储桶数

    a. 创建一个名为 table1 的 Primary Key 表。该表由三列组成:idnamescore,其中 id 是主键。

    CREATE TABLE `table1`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "user name",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    b. 创建一个名为 table2 的 Primary Key 表。该表由两列组成:idcity,其中 id 是主键。

    CREATE TABLE `table2`
    (
    `id` int(11) NOT NULL COMMENT "city ID",
    `city` varchar(65533) NULL DEFAULT "" COMMENT "city name"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);
  3. file1.csvfile2.csv 上传到 HDFS 集群的 /user/starrocks/ 路径,到 AWS S3 存储桶 bucket_s3input 文件夹,到 Google GCS 存储桶 bucket_gcsinput 文件夹,到 MinIO 存储桶 bucket_minioinput 文件夹,以及到 Azure Storage 的指定路径。

从 HDFS 加载数据

执行以下语句,将 file1.csvfile2.csv 从 HDFS 集群的 /user/starrocks 路径加载到 table1table2 中,分别

LOAD LABEL test_db.label1
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
)
PROPERTIES
(
"timeout" = "3600"
);

在上面的示例中,StorageCredentialParams 表示一组身份验证参数,这些参数根据您选择的身份验证方法而有所不同。有关详细信息,请参阅 BROKER LOAD

从 AWS S3 加载数据

执行以下语句,将 file1.csvfile2.csv 从 AWS S3 存储桶 bucket_s3input 文件夹加载到 table1table2 中,分别

LOAD LABEL test_db.label2
(
DATA INFILE("s3a://bucket_s3/input/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("s3a://bucket_s3/input/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
);

注意

Broker Load 仅支持根据 S3A 协议访问 AWS S3。因此,当您从 AWS S3 加载数据时,必须将作为文件路径传递的 S3 URI 中的 s3:// 替换为 s3a://

在上面的示例中,StorageCredentialParams 表示一组身份验证参数,这些参数根据您选择的身份验证方法而有所不同。有关详细信息,请参阅 BROKER LOAD

从 v3.1 开始,StarRocks 支持通过使用 INSERT 命令和 TABLE 关键字直接从 AWS S3 加载 Parquet 格式或 ORC 格式文件的数据,从而省去了首先创建外部表的麻烦。有关详细信息,请参阅 使用 INSERT 加载数据 > 使用 TABLE 关键字直接从外部源的文件中插入数据

从 Google GCS 加载数据

执行以下语句,将 file1.csvfile2.csv 从 Google GCS 存储桶 bucket_gcsinput 文件夹加载到 table1table2 中,分别

LOAD LABEL test_db.label3
(
DATA INFILE("gs://bucket_gcs/input/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("gs://bucket_gcs/input/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
);

注意

Broker Load 仅支持根据 gs 协议访问 Google GCS。因此,当您从 Google GCS 加载数据时,必须在作为文件路径传递的 GCS URI 中包含 gs:// 作为前缀。

在上面的示例中,StorageCredentialParams 表示一组身份验证参数,这些参数根据您选择的身份验证方法而有所不同。有关详细信息,请参阅 BROKER LOAD

从其他 S3 兼容的存储系统加载数据

以 MinIO 为例。您可以执行以下语句,将 file1.csvfile2.csv 从 MinIO 存储桶 bucket_minioinput 文件夹加载到 table1table2 中,分别

LOAD LABEL test_db.label7
(
DATA INFILE("s3://bucket_minio/input/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("s3://bucket_minio/input/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
);

在上面的示例中,StorageCredentialParams 表示一组身份验证参数,这些参数根据您选择的身份验证方法而有所不同。有关详细信息,请参阅 BROKER LOAD

从 Microsoft Azure Storage 加载数据

执行以下语句,从 Azure Storage 的指定路径加载 file1.csvfile2.csv

LOAD LABEL test_db.label8
(
DATA INFILE("wasb[s]://<container>@<storage_account>.blob.core.windows.net/<path>/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("wasb[s]://<container>@<storage_account>.blob.core.windows.net/<path>/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
);

注意

当您从 Azure Storage 加载数据时,您需要根据您使用的访问协议和特定存储服务确定要使用的前缀。前面的示例以 Blob Storage 为例。

  • 当您从 Blob Storage 加载数据时,您必须根据用于访问存储帐户的协议在文件路径中包含 wasb://wasbs:// 作为前缀
    • 如果您的 Blob Storage 仅允许通过 HTTP 访问,请使用 wasb:// 作为前缀,例如,wasb://<container>@<storage_account>.blob.core.windows.net/<path>/<file_name>/*
    • 如果您的 Blob Storage 仅允许通过 HTTPS 访问,请使用 wasbs:// 作为前缀,例如,wasbs://<container>@<storage_account>.blob.core.windows.net/<path>/<file_name>/*
  • 当您从 Data Lake Storage Gen1 加载数据时,您必须在文件路径中包含 adl:// 作为前缀,例如,adl://<data_lake_storage_gen1_name>.azuredatalakestore.net/<path>/<file_name>
  • 当您从 Data Lake Storage Gen2 加载数据时,您必须根据用于访问存储帐户的协议在文件路径中包含 abfs://abfss:// 作为前缀
    • 如果您的 Data Lake Storage Gen2 仅允许通过 HTTP 访问,请使用 abfs:// 作为前缀,例如,abfs://<container>@<storage_account>.dfs.core.windows.net/<file_name>
    • 如果您的 Data Lake Storage Gen2 仅允许通过 HTTPS 访问,请使用 abfss:// 作为前缀,例如,abfss://<container>@<storage_account>.dfs.core.windows.net/<file_name>

在上面的示例中,StorageCredentialParams 表示一组身份验证参数,这些参数根据您选择的身份验证方法而有所不同。有关详细信息,请参阅 BROKER LOAD

查询数据

从 HDFS 集群、AWS S3 存储桶或 Google GCS 存储桶加载数据完成后,您可以使用 SELECT 语句查询 StarRocks 表的数据,以验证加载是否成功。

  1. 执行以下语句以查询 table1 的数据

    MySQL [test_db]> SELECT * FROM table1;
    +------+-------+-------+
    | id | name | score |
    +------+-------+-------+
    | 1 | Lily | 23 |
    | 2 | Rose | 23 |
    | 3 | Alice | 24 |
    | 4 | Julia | 25 |
    +------+-------+-------+
    4 rows in set (0.00 sec)
  2. 执行以下语句以查询 table2 的数据

    MySQL [test_db]> SELECT * FROM table2;
    +------+--------+
    | id | city |
    +------+--------+
    | 200 | Beijing|
    +------+--------+
    4 rows in set (0.01 sec)

创建单表加载作业

您还可以将单个数据文件或指定路径中的所有数据文件加载到单个目标表中。假设您的 AWS S3 存储桶 bucket_s3 包含一个名为 input 的文件夹。 input 文件夹包含多个数据文件,其中一个名为 file1.csv。这些数据文件包含与 table1 相同的列数,并且这些数据文件中的列可以按顺序一一映射到 table1 中的列。

要将 file1.csv 加载到 table1 中,请执行以下语句

LOAD LABEL test_db.label_7
(
DATA INFILE("s3a://bucket_s3/input/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
FORMAT AS "CSV"
)
WITH BROKER
(
StorageCredentialParams
);

要将 input 文件夹中的所有数据文件加载到 table1 中,请执行以下语句

LOAD LABEL test_db.label_8
(
DATA INFILE("s3a://bucket_s3/input/*")
INTO TABLE table1
COLUMNS TERMINATED BY ","
FORMAT AS "CSV"
)
WITH BROKER
(
StorageCredentialParams
);

在上面的示例中,StorageCredentialParams 表示一组身份验证参数,这些参数根据您选择的身份验证方法而有所不同。有关详细信息,请参阅 BROKER LOAD

查看加载作业

Broker Load 允许您使用 SHOW LOAD 语句或 curl 命令查看 lob 作业。

使用 SHOW LOAD

有关详细信息,请参阅 SHOW LOAD

使用 curl

语法如下

curl --location-trusted -u <username>:<password> \
'http://<fe_host>:<fe_http_port>/api/<database_name>/_load_info?label=<label_name>'

注意

如果您使用未设置密码的帐户,则只需输入 <username>:

例如,您可以运行以下命令来查看 test_db 数据库中标签为 label1 的加载作业的信息

curl --location-trusted -u <username>:<password> \
'http://<fe_host>:<fe_http_port>/api/test_db/_load_info?label=label1'

curl 命令将以 JSON 对象 jobInfo 的形式返回有关具有指定标签的最新执行的加载作业的信息

{"jobInfo":{"dbName":"default_cluster:test_db","tblNames":["table1_simple"],"label":"label1","state":"FINISHED","failMsg":"","trackingUrl":""},"status":"OK","msg":"Success"}%

下表描述了 jobInfo 中的参数。

参数描述
dbName加载数据的数据库的名称
tblNames加载数据的表的名称。
label导入作业的标签。
state加载作业的状态。有效值
  • PENDING:加载作业在队列中等待调度。
  • QUEUEING:加载作业在队列中等待调度。
  • LOADING:加载作业正在运行。
  • PREPARED:事务已提交。
  • FINISHED:加载作业成功。
  • CANCELLED:加载作业失败。
有关详细信息,请参阅 加载概念 中的“异步加载”部分。
failMsg加载作业失败的原因。如果加载作业的 state 值为 PENDINGLOADINGFINISHED,则 failMsg 参数将返回 NULL。如果加载作业的 state 值为 CANCELLED,则为 failMsg 参数返回的值由两部分组成:typemsg
  • type 部分可以是以下任意值
    • USER_CANCEL:加载作业已手动取消。
    • ETL_SUBMIT_FAIL:加载作业提交失败。
    • ETL-QUALITY-UNSATISFIED:加载作业失败,因为不合格数据的百分比超过了 max-filter-ratio 参数的值。
    • LOAD-RUN-FAIL:加载作业在 LOADING 阶段失败。
    • TIMEOUT:加载作业未在指定的超时期限内完成。
    • UNKNOWN:加载作业因未知错误而失败。
  • msg 部分提供了加载失败的详细原因。
trackingUrl用于访问在加载作业中检测到的不合格数据的 URL。您可以使用 curlwget 命令来访问 URL 并获取不合格数据。如果未检测到不合格数据,则 trackingUrl 参数将返回 NULL
status加载作业的 HTTP 请求的状态。有效值:OKFail
msg加载作业的 HTTP 请求的错误信息。

取消加载作业

当加载作业未处于 CANCELLEDFINISHED 阶段时,您可以使用 CANCEL LOAD 语句来取消作业。

例如,您可以执行以下语句来取消数据库 test_db 中标签为 label1 的加载作业

CANCEL LOAD
FROM test_db
WHERE LABEL = "label";

作业拆分和并发运行

Broker Load 作业可以拆分为一个或多个并发运行的任务。加载作业中的任务在单个事务中运行。它们必须全部成功或全部失败。StarRocks 根据您在 LOAD 语句中声明 data_desc 的方式拆分每个加载作业

  • 如果您声明多个 data_desc 参数,每个参数指定一个不同的表,则会生成一个任务来加载每个表的数据。

  • 如果您声明多个 data_desc 参数,每个参数指定同一表的不同分区,则会生成一个任务来加载每个分区的数据。

此外,每个任务可以进一步拆分为一个或多个实例,这些实例均匀分布在 StarRocks 集群的 BE 上并并发运行。StarRocks 根据以下 FE 配置 拆分每个任务

  • min_bytes_per_broker_scanner:每个实例处理的最小数据量。默认量为 64 MB。

  • load_parallel_instance_num:每个 BE 上的每个加载作业允许的并发实例数。默认数为 1。

    您可以使用以下公式来计算单个任务中的实例数

    单个任务中的实例数 = min(单个任务要加载的数据量/min_bytes_per_broker_scanner,load_parallel_instance_num x BE 数)

在大多数情况下,每个加载作业仅声明一个 data_desc,每个加载作业仅拆分为一个任务,并且该任务拆分为与 BE 数相同的实例数。

FE 配置项 max_broker_load_job_concurrency 指定了 StarRocks 集群中可以并发运行的最大 Broker Load 作业数。

在 StarRocks v2.4 及更早版本中,如果在特定时间段内提交的 Broker Load 作业总数超过最大数量,则会根据其提交时间对过多的作业进行排队和调度。

自 StarRocks v2.5 起,如果在特定时间段内提交的 Broker Load 作业总数超过最大数量,则会根据其优先级对过多的作业进行排队和调度。您可以在作业创建时使用 priority 参数为作业指定优先级。请参阅 BROKER LOAD。您还可以使用 ALTER LOAD 修改处于 QUEUEINGLOADING 状态的现有作业的优先级。