跳到主要内容
版本: 最新版本-3.5

S3 加载

从 AWS S3 加载数据

StarRocks 提供了以下从 AWS S3 加载数据的选项

这些选项各有优势,以下各节将详细介绍。

在大多数情况下,我们建议您使用 INSERT+FILES() 方法,该方法更易于使用。

但是,INSERT+FILES() 方法目前仅支持 Parquet、ORC 和 CSV 文件格式。 因此,如果您需要加载其他文件格式的数据(例如 JSON),或在数据加载期间执行数据更改(例如 DELETE),您可以求助于 Broker Load。

如果您需要加载大量数据文件,总数据量很大(例如,超过 100 GB 甚至 1 TB),我们建议您使用 Pipe 方法。 Pipe 可以根据文件的数量或大小拆分文件,将加载作业分解为更小的顺序任务。 这种方法确保一个文件中的错误不会影响整个加载作业,并最大限度地减少因数据错误而导致的重试需求。

准备工作

准备源数据

确保要加载到 StarRocks 中的源数据正确存储在 S3 Bucket 中。 您还可以考虑数据和数据库的位置,因为当您的 Bucket 和 StarRocks 集群位于同一区域时,数据传输成本会低得多。

在本主题中,我们为您提供 S3 Bucket 中的示例数据集,s3://starrocks-examples/user-behavior-10-million-rows.parquet。 您可以使用任何有效的凭据访问该数据集,因为任何经过 AWS 身份验证的用户都可以读取该对象。

检查权限

您只能以对这些 StarRocks 表具有 INSERT 权限的用户的身份将数据加载到 StarRocks 表中。 如果您没有 INSERT 权限,请按照 GRANT 中提供的说明,将 INSERT 权限授予用于连接到 StarRocks 集群的用户。 语法为 GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}

收集身份验证详细信息

本主题中的示例使用基于 IAM 用户的身份验证。 为了确保您有权从 AWS S3 读取数据,我们建议您阅读 基于 IAM 用户的身份验证准备 并按照说明创建一个配置了正确的 IAM 策略 的 IAM 用户。

简而言之,如果您实践基于 IAM 用户的身份验证,则需要收集有关以下 AWS 资源的信息

  • 存储您的数据的 S3 Bucket。
  • S3 对象键(对象名称),如果访问 Bucket 中的特定对象。 请注意,如果您的 S3 对象存储在子文件夹中,则对象键可以包括前缀。
  • S3 Bucket 所属的 AWS 区域。
  • 用作访问凭据的访问密钥和密钥。

有关所有可用的身份验证方法的信息,请参阅 对 AWS 资源进行身份验证

使用 INSERT+FILES()

此方法从 v3.1 开始可用,目前仅支持 Parquet、ORC 和 CSV(从 v3.3.0 开始)文件格式。

INSERT+FILES() 的优点

FILES() 可以读取存储在云存储中的文件,基于您指定的路径相关属性,推断文件中数据的表模式,然后从文件中返回数据作为数据行。

使用 FILES(),您可以

典型示例

使用 SELECT 直接从 S3 查询

使用 SELECT+FILES() 直接从 S3 查询可以在创建表之前很好地预览数据集的内容。 例如

  • 在不存储数据的情况下预览数据集。
  • 查询最小值和最大值,并确定要使用的数据类型。
  • 检查 NULL 值。

以下示例查询示例数据集 s3://starrocks-examples/user-behavior-10-million-rows.parquet

SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
LIMIT 3;

注意

在上面的命令中,将您的凭据替换为 AAABBB。可以使用任何有效的 aws.s3.access_keyaws.s3.secret_key,因为任何通过 AWS 身份验证的用户都可以读取该对象。

系统返回以下查询结果

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 1 | 2576651 | 149192 | pv | 2017-11-25 01:21:25 |
| 1 | 3830808 | 4181361 | pv | 2017-11-25 07:04:53 |
| 1 | 4365585 | 2520377 | pv | 2017-11-25 07:49:06 |
+--------+---------+------------+--------------+---------------------+

注意

请注意,上面返回的列名由 Parquet 文件提供。

使用 CTAS 创建和加载表

这是前一个示例的延续。 前一个查询包含在 CREATE TABLE AS SELECT (CTAS) 中,以使用模式推断自动创建表。 这意味着 StarRocks 将推断表模式,创建您想要的表,然后将数据加载到表中。 使用 FILES() 表函数和 Parquet 文件时,由于 Parquet 格式包含列名,因此无需创建表时指定列名和类型。

注意

使用模式推断时,CREATE TABLE 的语法不允许设置副本数,因此请在创建表之前设置它。 下面的示例适用于具有一个副本的系统

ADMIN SET FRONTEND CONFIG ('default_replication_num' = "1");

创建一个数据库并切换到它

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

使用 CTAS 创建表并将示例数据集 s3://starrocks-examples/user-behavior-10-million-rows.parquet 的数据加载到表中

CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);

注意

在上面的命令中,将您的凭据替换为 AAABBB。可以使用任何有效的 aws.s3.access_keyaws.s3.secret_key,因为任何通过 AWS 身份验证的用户都可以读取该对象。

创建表后,您可以使用 DESCRIBE 查看其架构

DESCRIBE user_behavior_inferred;

系统返回以下查询结果

+--------------+------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+------------------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varchar(1048576) | YES | false | NULL | |
| Timestamp | varchar(1048576) | YES | false | NULL | |
+--------------+------------------+------+-------+---------+-------+

查询表以验证数据是否已加载到其中。 例子

SELECT * from user_behavior_inferred LIMIT 3;

返回以下查询结果,表明数据已成功加载

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 225586 | 3694958 | 1040727 | pv | 2017-12-01 00:58:40 |
| 225586 | 3726324 | 965809 | pv | 2017-12-01 02:16:02 |
| 225586 | 3732495 | 1488813 | pv | 2017-12-01 00:59:46 |
+--------+---------+------------+--------------+---------------------+

使用 INSERT 加载到现有表

您可能想要自定义要插入的表,例如

  • 列数据类型、可为空设置或默认值
  • 键类型和列
  • 数据分区和分桶

注意

创建最有效的表结构需要了解如何使用数据和列的内容。 本主题不涵盖表设计。 有关表设计的信息,请参阅 表类型

在本示例中,我们基于对表查询方式和 Parquet 文件中数据的了解来创建表。 对 Parquet 文件中数据的了解可以通过直接在 S3 中查询文件来获得。

  • 由于对 S3 中数据集的查询表明 Timestamp 列包含与 VARCHAR 数据类型匹配的数据,并且 StarRocks 可以从 VARCHAR 转换为 DATETIME,因此在以下 DDL 中将数据类型更改为 DATETIME。
  • 通过查询 S3 中的数据,您可以发现数据集中没有 NULL 值,因此 DDL 也可以将所有列设置为不可为空。
  • 根据对预期查询类型的了解,排序键和存储桶列设置为列 UserID。 您的用例可能与此数据不同,因此您可能会决定将 ItemID 添加到或代替 UserID 用于排序键。

创建一个数据库并切换到它

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

手动创建表

CREATE TABLE user_behavior_declared
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);

显示架构,以便您可以将其与 FILES() 表函数生成的推断架构进行比较

DESCRIBE user_behavior_declared;
+--------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+----------------+------+-------+---------+-------+
| UserID | int | YES | true | NULL | |
| ItemID | int | YES | false | NULL | |
| CategoryID | int | YES | false | NULL | |
| BehaviorType | varchar(65533) | YES | false | NULL | |
| Timestamp | datetime | YES | false | NULL | |
+--------------+----------------+------+-------+---------+-------+
提示

将您刚刚创建的架构与之前使用 FILES() 表函数推断的架构进行比较。 查看

  • 数据类型
  • 可为空
  • 键字段

为了更好地控制目标表的架构并获得更好的查询性能,我们建议您在生产环境中手动指定表架构。

创建表后,您可以使用 INSERT INTO SELECT FROM FILES() 加载它

INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);

注意

在上面的命令中,将您的凭据替换为 AAABBB。可以使用任何有效的 aws.s3.access_keyaws.s3.secret_key,因为任何通过 AWS 身份验证的用户都可以读取该对象。

加载完成后,您可以查询表以验证数据是否已加载到其中。 例子

SELECT * from user_behavior_declared LIMIT 3;

返回以下查询结果,表明数据已成功加载

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 393529 | 3715112 | 883960 | pv | 2017-12-02 02:45:44 |
| 393529 | 2650583 | 883960 | pv | 2017-12-02 02:45:59 |
| 393529 | 3715112 | 883960 | pv | 2017-12-02 03:00:56 |
+--------+---------+------------+--------------+---------------------+

检查加载进度

您可以从 StarRocks Information Schema 中的 loads 视图查询 INSERT 作业的进度。 从 v3.1 开始支持此功能。 例子

SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;

有关 loads 视图中提供的字段的信息,请参阅 loads

如果您提交了多个加载作业,您可以按与作业关联的 LABEL 进行过滤。 例子

SELECT * FROM information_schema.loads WHERE LABEL = 'insert_e3b882f5-7eb3-11ee-ae77-00163e267b60' \G
*************************** 1. row ***************************
JOB_ID: 10243
LABEL: insert_e3b882f5-7eb3-11ee-ae77-00163e267b60
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-11-09 11:56:01
ETL_START_TIME: 2023-11-09 11:56:01
ETL_FINISH_TIME: 2023-11-09 11:56:01
LOAD_START_TIME: 2023-11-09 11:56:01
LOAD_FINISH_TIME: 2023-11-09 11:56:44
JOB_DETAILS: {"All backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[10142]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL

注意

INSERT 是一个同步命令。 如果 INSERT 作业仍在运行,您需要打开另一个会话来检查其执行状态。

使用 Broker Load

异步 Broker Load 进程处理与 S3 的连接、提取数据以及将数据存储在 StarRocks 中。

此方法支持以下文件格式

  • Parquet
  • ORC
  • CSV
  • JSON(从 v3.2.3 开始支持)

Broker Load 的优点

  • Broker Load 在后台运行,客户端无需保持连接即可继续作业。
  • Broker Load 首选用于长时间运行的作业,默认超时时间为 4 小时。
  • 除了 Parquet 和 ORC 文件格式外,Broker Load 还支持 CSV 文件格式和 JSON 文件格式(JSON 文件格式从 v3.2.3 开始支持)。

数据流

Workflow of Broker Load

  1. 用户创建一个加载作业。
  2. 前端 (FE) 创建一个查询计划,并将该计划分发到后端节点 (BE) 或计算节点 (CN)。
  3. BE 或 CN 从源中提取数据并将数据加载到 StarRocks 中。

典型示例

创建一个表,启动一个从 S3 提取示例数据集 s3://starrocks-examples/user-behavior-10-million-rows.parquet 的加载进程,并验证数据加载的进度和成功。

创建一个数据库和一个表

创建一个数据库并切换到它

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

手动创建一个表(我们建议该表具有与要从 AWS S3 加载的 Parquet 文件相同的模式)

CREATE TABLE user_behavior
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);

启动 Broker Load

运行以下命令启动一个 Broker Load 作业,该作业将数据从示例数据集 s3://starrocks-examples/user-behavior-10-million-rows.parquet 加载到 user_behavior

LOAD LABEL user_behavior
(
DATA INFILE("s3://starrocks-examples/user-behavior-10-million-rows.parquet")
INTO TABLE user_behavior
FORMAT AS "parquet"
)
WITH BROKER
(
"aws.s3.enable_ssl" = "true",
"aws.s3.use_instance_profile" = "false",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
PROPERTIES
(
"timeout" = "72000"
);

注意

在上面的命令中,将您的凭据替换为 AAABBB。可以使用任何有效的 aws.s3.access_keyaws.s3.secret_key,因为任何通过 AWS 身份验证的用户都可以读取该对象。

此作业有四个主要部分

  • LABEL:一个字符串,用于查询加载作业的状态。
  • LOAD 声明:源 URI、源数据格式和目标表名。
  • BROKER:源的连接详细信息。
  • PROPERTIES:超时值和任何其他要应用于加载作业的属性。

有关详细语法和参数说明,请参阅 BROKER LOAD

检查加载进度

您可以从 StarRocks Information Schema 中的 loads 视图查询 Broker Load 作业的进度。 此功能从 v3.1 开始支持。

SELECT * FROM information_schema.loads WHERE LABEL = 'user_behavior';

有关 loads 视图中提供的字段的信息,请参阅 loads

此记录显示 LOADING 状态,进度为 39%。 如果您看到类似的内容,请再次运行该命令,直到看到 FINISHED 状态。

              JOB_ID: 10466
LABEL: user_behavior
DATABASE_NAME: mydatabase
STATE: LOADING
PROGRESS: ETL:100%; LOAD:39%
TYPE: BROKER
PRIORITY: NORMAL
SCAN_ROWS: 4620288
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 4620288
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):72000; max_filter_ratio:0.0
CREATE_TIME: 2024-02-28 22:11:36
ETL_START_TIME: 2024-02-28 22:11:41
ETL_FINISH_TIME: 2024-02-28 22:11:41
LOAD_START_TIME: 2024-02-28 22:11:41
LOAD_FINISH_TIME: NULL
JOB_DETAILS: {"All backends":{"2fb97223-b14c-404b-9be1-83aa9b3a7715":[10004]},"FileNumber":1,"FileSize":136901706,"InternalTableLoadBytes":144032784,"InternalTableLoadRows":4620288,"ScanBytes":143969616,"ScanRows":4620288,"TaskNumber":1,"Unfinished backends":{"2fb97223-b14c-404b-9be1-83aa9b3a7715":[10004]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL

在确认加载作业已完成后,您可以检查目标表的子集,以查看数据是否已成功加载。 例子

SELECT * from user_behavior LIMIT 3;

返回以下查询结果,表明数据已成功加载

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 34 | 856384 | 1029459 | pv | 2017-11-27 14:43:27 |
| 34 | 5079705 | 1029459 | pv | 2017-11-27 14:44:13 |
| 34 | 4451615 | 1029459 | pv | 2017-11-27 14:45:52 |
+--------+---------+------------+--------------+---------------------+

使用 Pipe

从 v3.2 开始,StarRocks 提供了 Pipe 加载方法,该方法目前仅支持 Parquet 和 ORC 文件格式。

Pipe 的优势

Pipe 非常适合连续数据加载和大规模数据加载

  • 微批量的大规模数据加载有助于降低因数据错误而导致的重试成本。

    借助 Pipe,StarRocks 能够有效地加载大量数据文件,总数据量很大。 Pipe 自动根据文件的数量或大小拆分文件,将加载作业分解为更小的顺序任务。 这种方法确保一个文件中的错误不会影响整个加载作业。 每个文件的加载状态都由 Pipe 记录,使您可以轻松识别和修复包含错误的文件。 通过最大限度地减少因数据错误而导致的重试需求,此方法有助于降低成本。

  • 连续数据加载有助于减少人力。

    Pipe 帮助您将新的或更新的数据文件写入特定位置,并不断将这些文件中的新数据加载到 StarRocks 中。 使用指定的 "AUTO_INGEST" = "TRUE" 创建 Pipe 作业后,它将不断监视存储在指定路径中的数据文件的更改,并自动将新数据或更新的数据从数据文件加载到目标 StarRocks 表中。

此外,Pipe 执行文件唯一性检查以帮助防止重复数据加载。在加载过程中,Pipe 根据文件名和摘要检查每个数据文件的唯一性。 如果具有特定文件名和摘要的文件已经由 Pipe 作业处理过,则 Pipe 作业将跳过所有后续具有相同文件名和摘要的文件。 请注意像 AWS S3 这样的对象存储使用 ETag作为文件摘要。

每个数据文件的加载状态都会被记录并保存到 information_schema.pipe_files 视图中。在删除与该视图关联的 Pipe 作业后,有关在该作业中加载的文件的记录也将被删除。

数据流

Pipe data flow

Pipe 非常适合连续数据加载和大规模数据加载

  • 微批量的大规模数据加载有助于降低因数据错误而导致的重试成本。

    借助 Pipe,StarRocks 能够有效地加载大量数据文件,总数据量很大。 Pipe 自动根据文件的数量或大小拆分文件,将加载作业分解为更小的顺序任务。 这种方法确保一个文件中的错误不会影响整个加载作业。 每个文件的加载状态都由 Pipe 记录,使您可以轻松识别和修复包含错误的文件。 通过最大限度地减少因数据错误而导致的重试需求,此方法有助于降低成本。

  • 连续数据加载有助于减少人力。

    Pipe 帮助您将新的或更新的数据文件写入特定位置,并不断将这些文件中的新数据加载到 StarRocks 中。 使用指定的 "AUTO_INGEST" = "TRUE" 创建 Pipe 作业后,它将不断监视存储在指定路径中的数据文件的更改,并自动将新数据或更新的数据从数据文件加载到目标 StarRocks 表中。

此外,Pipe 执行文件唯一性检查以帮助防止重复数据加载。在加载过程中,Pipe 根据文件名和摘要检查每个数据文件的唯一性。 如果具有特定文件名和摘要的文件已经由 Pipe 作业处理过,则 Pipe 作业将跳过所有后续具有相同文件名和摘要的文件。 请注意,像 AWS S3 这样的对象存储使用 ETag 作为文件摘要。

每个数据文件的加载状态都会被记录并保存到 information_schema.pipe_files 视图中。在删除与该视图关联的 Pipe 作业后,有关在该作业中加载的文件的记录也将被删除。

Pipe 和 INSERT+FILES() 之间的差异

Pipe 作业根据每个数据文件的大小和行数拆分为一个或多个事务。 用户可以在加载过程中查询中间结果。 相比之下,INSERT+FILES() 作业作为单个事务处理,用户无法在加载过程中查看数据。

文件加载顺序

对于每个 Pipe 作业,StarRocks 维护一个文件队列,从中以微批处理方式提取和加载数据文件。 Pipe 不保证数据文件以与上传顺序相同的顺序加载。 因此,较新的数据可能在较旧的数据之前加载。

典型示例

创建一个数据库和一个表

创建一个数据库并切换到它

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

手动创建一个表(我们建议该表具有与要从 AWS S3 加载的 Parquet 文件相同的模式)

CREATE TABLE user_behavior_from_pipe
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);

启动一个 Pipe 作业

运行以下命令启动一个 Pipe 作业,该作业将数据从示例数据集 s3://starrocks-examples/user-behavior-10-million-rows/ 加载到 user_behavior_from_pipe 表。 此 Pipe 作业使用微批处理和连续加载(如上所述)pipe 特定功能。

本指南中的其他示例加载包含 1000 万行的单个 Parquet 文件。 对于 Pipe 示例,相同的数据集被拆分为 57 个单独的文件,这些文件都存储在一个 S3 文件夹中。 请注意,在下面的 CREATE PIPE 命令中,path 是 S3 文件夹的 URI,而不是提供文件名,URI 以 /* 结尾。 通过设置 AUTO_INGEST 并指定一个文件夹而不是单个文件,pipe 作业将轮询 S3 文件夹中的新文件,并在文件添加到文件夹时摄取它们。

CREATE PIPE user_behavior_pipe
PROPERTIES
(
"AUTO_INGEST" = "TRUE"
)
AS
INSERT INTO user_behavior_from_pipe
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows/*",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);

注意

在上面的命令中,将您的凭据替换为 AAABBB。可以使用任何有效的 aws.s3.access_keyaws.s3.secret_key,因为任何通过 AWS 身份验证的用户都可以读取该对象。

此作业有四个主要部分

  • pipe_name: Pipe 的名称。Pipe 名称在 Pipe 所属的数据库中必须是唯一的。
  • INSERT_SQL: 用于将数据从指定的源数据文件加载到目标表的 INSERT INTO SELECT FROM FILES 语句。
  • PROPERTIES:一组可选参数,用于指定如何执行 Pipe。 这些参数包括 AUTO_INGESTPOLL_INTERVALBATCH_SIZEBATCH_FILES。 以 "key" = "value" 格式指定这些属性。

有关详细的语法和参数说明,请参阅 CREATE PIPE

检查加载进度

  • 通过在 Pipe 作业所属的当前数据库中使用 SHOW PIPES 查询 Pipe 作业的进度。

    SHOW PIPES WHERE NAME = 'user_behavior_pipe' \G

    返回以下结果

    提示

    在下面显示的输出中,pipe 处于 RUNNING 状态。 Pipe 将保持 RUNNING 状态,直到您手动停止它。 输出还显示了已加载的文件数 (57) 以及上次加载文件的时间。

    *************************** 1. row ***************************
    DATABASE_NAME: mydatabase
    PIPE_ID: 10476
    PIPE_NAME: user_behavior_pipe
    STATE: RUNNING
    TABLE_NAME: mydatabase.user_behavior_from_pipe
    LOAD_STATUS: {"loadedFiles":57,"loadedBytes":295345637,"loadingFiles":0,"lastLoadedTime":"2024-02-28 22:14:19"}
    LAST_ERROR: NULL
    CREATED_TIME: 2024-02-28 22:13:41
    1 row in set (0.02 sec)
  • 从 StarRocks Information Schema 中的 pipes 视图查询 Pipe 作业的进度。

    SELECT * FROM information_schema.pipes WHERE pipe_name = 'user_behavior_replica' \G

    返回以下结果

    提示

    本指南中的一些查询以 \G 而不是分号 (;) 结尾。 这会导致 MySQL 客户端以垂直格式输出结果。 如果您正在使用 DBeaver 或其他客户端,则可能需要使用分号 (;) 而不是 \G

    *************************** 1. row ***************************
    DATABASE_NAME: mydatabase
    PIPE_ID: 10217
    PIPE_NAME: user_behavior_replica
    STATE: RUNNING
    TABLE_NAME: mydatabase.user_behavior_replica
    LOAD_STATUS: {"loadedFiles":1,"loadedBytes":132251298,"loadingFiles":0,"lastLoadedTime":"2023-11-09 15:35:42"}
    LAST_ERROR:
    CREATED_TIME: 9891-01-15 07:51:45
    1 row in set (0.01 sec)

检查文件状态

您可以从 StarRocks 信息模式中的 pipe_files 视图查询已加载文件的加载状态。

SELECT * FROM information_schema.pipe_files WHERE pipe_name = 'user_behavior_replica' \G

返回以下结果

*************************** 1. row ***************************
DATABASE_NAME: mydatabase
PIPE_ID: 10217
PIPE_NAME: user_behavior_replica
FILE_NAME: s3://starrocks-examples/user-behavior-10-million-rows.parquet
FILE_VERSION: e29daa86b1120fea58ad0d047e671787-8
FILE_SIZE: 132251298
LAST_MODIFIED: 2023-11-06 13:25:17
LOAD_STATE: FINISHED
STAGED_TIME: 2023-11-09 15:35:02
START_LOAD_TIME: 2023-11-09 15:35:03
FINISH_LOAD_TIME: 2023-11-09 15:35:42
ERROR_MSG:
1 row in set (0.03 sec)

管理 Pipe 作业

您可以更改、暂停或恢复、删除或查询您创建的 Pipe,并重试加载特定的数据文件。 有关更多信息,请参阅 ALTER PIPESUSPEND or RESUME PIPEDROP PIPESHOW PIPESRETRY FILE