从 AWS S3 加载数据
StarRocks 提供了以下从 AWS S3 加载数据的选项
- 使用 INSERT+
FILES()
进行同步加载 - 使用 Broker Load 进行异步加载
- 使用 Pipe 进行持续异步加载
这些选项各有优势,以下各节将详细介绍。
在大多数情况下,我们建议您使用 INSERT+FILES()
方法,该方法更易于使用。
但是,INSERT+FILES()
方法目前仅支持 Parquet、ORC 和 CSV 文件格式。因此,如果您需要加载其他文件格式(例如 JSON)的数据,或在数据加载期间执行数据更改(例如 DELETE),您可以求助于 Broker Load。
如果您需要加载大量数据文件,且总数据量很大(例如,超过 100 GB 甚至 1 TB),我们建议您使用 Pipe 方法。Pipe 可以根据文件数量或大小拆分文件,将加载作业分解为更小的顺序任务。这种方法确保一个文件中的错误不会影响整个加载作业,并最大限度地减少因数据错误而需要重试的次数。
准备工作
准备源数据
确保您要加载到 StarRocks 中的源数据已正确存储在 S3 存储桶中。您还可以考虑数据和数据库的位置,因为当您的存储桶和 StarRocks 集群位于同一区域时,数据传输成本要低得多。
在本主题中,我们为您提供 S3 存储桶中的示例数据集 s3://starrocks-examples/user-behavior-10-million-rows.parquet
。您可以使用任何有效的凭据访问该数据集,因为任何 AWS 身份验证用户都可以读取该对象。
检查权限
您只能以对这些 StarRocks 表具有 INSERT 权限的用户身份将数据加载到 StarRocks 表中。如果您没有 INSERT 权限,请按照 GRANT 中提供的说明,将 INSERT 权限授予用于连接到 StarRocks 集群的用户。语法为 GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}
。
收集身份验证详细信息
本主题中的示例使用基于 IAM 用户的身份验证。为确保您有权从 AWS S3 读取数据,我们建议您阅读 IAM 用户身份验证准备工作 并按照说明创建一个 IAM 用户,并配置正确的 IAM 策略。
简而言之,如果您实践基于 IAM 用户的身份验证,您需要收集有关以下 AWS 资源的信息
- 存储数据的 S3 存储桶。
- 如果要访问存储桶中的特定对象,则为 S3 对象键(对象名称)。请注意,如果您的 S3 对象存储在子文件夹中,则对象键可以包含前缀。
- S3 存储桶所属的 AWS 区域。
- 用作访问凭据的访问密钥和密钥。
有关所有可用身份验证方法的信息,请参阅 对 AWS 资源进行身份验证。
使用 INSERT+FILES()
此方法从 v3.1 开始可用,目前仅支持 Parquet、ORC 和 CSV(从 v3.3.0 开始)文件格式。
INSERT+FILES() 的优点
FILES()
可以根据您指定的与路径相关的属性读取存储在云存储中的文件,推断文件中数据的表模式,然后将文件中的数据作为数据行返回。
使用 FILES()
,您可以
- 使用 SELECT 直接从 S3 查询数据。
- 使用 CREATE TABLE AS SELECT (CTAS) 创建和加载表。
- 使用 INSERT 将数据加载到现有表中。
典型示例
使用 SELECT 直接从 S3 查询
使用 SELECT+FILES()
直接从 S3 查询可以在创建表之前很好地预览数据集的内容。例如
- 在不存储数据的情况下预览数据集。
- 查询最小值和最大值,并确定要使用的数据类型。
- 检查
NULL
值。
以下示例查询示例数据集 s3://starrocks-examples/user-behavior-10-million-rows.parquet
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
LIMIT 3;
注意
在上面的命令中,将您的凭据替换为
AAA
和BBB
。可以使用任何有效的aws.s3.access_key
和aws.s3.secret_key
,因为任何通过 AWS 身份验证的用户都可以读取该对象。
系统返回以下查询结果
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 1 | 2576651 | 149192 | pv | 2017-11-25 01:21:25 |
| 1 | 3830808 | 4181361 | pv | 2017-11-25 07:04:53 |
| 1 | 4365585 | 2520377 | pv | 2017-11-25 07:49:06 |
+--------+---------+------------+--------------+---------------------+
注意
请注意,上面返回的列名由 Parquet 文件提供。
使用 CTAS 创建和加载表
这是前一个示例的延续。前一个查询包装在 CREATE TABLE AS SELECT (CTAS) 中,以使用模式推断自动创建表。这意味着 StarRocks 将推断表模式,创建您想要的表,然后将数据加载到表中。当使用带有 Parquet 文件的 FILES()
表函数时,由于 Parquet 格式包含列名,因此不需要创建表时的列名和类型。
注意
使用模式推断时,CREATE TABLE 的语法不允许设置副本数,因此请在创建表之前设置它。以下示例适用于具有一个副本的系统
ADMIN SET FRONTEND CONFIG ('default_replication_num' = "1");
创建一个数据库并切换到它
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
使用 CTAS 创建表并将示例数据集 s3://starrocks-examples/user-behavior-10-million-rows.parquet
的数据加载到表中
CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);
注意
在上面的命令中,将您的凭据替换为
AAA
和BBB
。可以使用任何有效的aws.s3.access_key
和aws.s3.secret_key
,因为任何通过 AWS 身份验证的用户都可以读取该对象。
创建表后,您可以使用 DESCRIBE 查看其架构
DESCRIBE user_behavior_inferred;
系统返回以下查询结果
+--------------+------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+------------------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varchar(1048576) | YES | false | NULL | |
| Timestamp | varchar(1048576) | YES | false | NULL | |
+--------------+------------------+------+-------+---------+-------+
查询表以验证数据是否已加载到其中。 例子
SELECT * from user_behavior_inferred LIMIT 3;
返回以下查询结果,表明数据已成功加载
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 225586 | 3694958 | 1040727 | pv | 2017-12-01 00:58:40 |
| 225586 | 3726324 | 965809 | pv | 2017-12-01 02:16:02 |
| 225586 | 3732495 | 1488813 | pv | 2017-12-01 00:59:46 |
+--------+---------+------------+--------------+---------------------+
使用 INSERT 加载到现有表
您可能想要自定义要插入的表,例如
- 列数据类型、可为空设置或默认值
- 键类型和列
- 数据分区和分桶
注意
创建最有效的表结构需要了解数据的使用方式和列的内容。本主题不涵盖表设计。有关表设计的信息,请参阅 表类型。
在此示例中,我们基于对表查询方式和 Parquet 文件中数据的了解来创建表。可以通过直接在 S3 中查询文件来了解 Parquet 文件中的数据。
- 由于 S3 中的数据集查询表明
Timestamp
列包含与 VARCHAR 数据类型匹配的数据,并且 StarRocks 可以从 VARCHAR 转换为 DATETIME,因此在以下 DDL 中将数据类型更改为 DATETIME。 - 通过查询 S3 中的数据,您可以发现数据集中没有
NULL
值,因此 DDL 也可以将所有列设置为非空。 - 根据对预期查询类型的了解,排序键和分桶列设置为
UserID
列。您的用例对于此数据可能有所不同,因此您可以决定在排序键中使用ItemID
以及UserID
或代替UserID
。
创建一个数据库并切换到它
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
手动创建表
CREATE TABLE user_behavior_declared
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);
显示架构,以便您可以将其与 FILES()
表函数生成的推断架构进行比较
DESCRIBE user_behavior_declared;
+--------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+----------------+------+-------+---------+-------+
| UserID | int | YES | true | NULL | |
| ItemID | int | YES | false | NULL | |
| CategoryID | int | YES | false | NULL | |
| BehaviorType | varchar(65533) | YES | false | NULL | |
| Timestamp | datetime | YES | false | NULL | |
+--------------+----------------+------+-------+---------+-------+
将您刚刚创建的架构与之前使用 FILES()
表函数推断的架构进行比较。 查看
- 数据类型
- 可为空
- 键字段
为了更好地控制目标表的架构并获得更好的查询性能,我们建议您在生产环境中手动指定表架构。
创建表后,您可以使用 INSERT INTO SELECT FROM FILES() 加载它
INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);
注意
在上面的命令中,将您的凭据替换为
AAA
和BBB
。可以使用任何有效的aws.s3.access_key
和aws.s3.secret_key
,因为任何通过 AWS 身份验证的用户都可以读取该对象。
加载完成后,您可以查询表以验证数据是否已加载到其中。 例子
SELECT * from user_behavior_declared LIMIT 3;
返回以下查询结果,表明数据已成功加载
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 393529 | 3715112 | 883960 | pv | 2017-12-02 02:45:44 |
| 393529 | 2650583 | 883960 | pv | 2017-12-02 02:45:59 |
| 393529 | 3715112 | 883960 | pv | 2017-12-02 03:00:56 |
+--------+---------+------------+--------------+---------------------+
检查加载进度
您可以从 StarRocks Information Schema 中的 loads
视图查询 INSERT 作业的进度。 从 v3.1 开始支持此功能。 例子
SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;
有关 loads
视图中提供的字段的信息,请参阅 loads
。
如果您提交了多个加载作业,您可以按与作业关联的 LABEL
进行过滤。 例子
SELECT * FROM information_schema.loads WHERE LABEL = 'insert_e3b882f5-7eb3-11ee-ae77-00163e267b60' \G
*************************** 1. row ***************************
JOB_ID: 10243
LABEL: insert_e3b882f5-7eb3-11ee-ae77-00163e267b60
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-11-09 11:56:01
ETL_START_TIME: 2023-11-09 11:56:01
ETL_FINISH_TIME: 2023-11-09 11:56:01
LOAD_START_TIME: 2023-11-09 11:56:01
LOAD_FINISH_TIME: 2023-11-09 11:56:44
JOB_DETAILS: {"All backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[10142]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
注意
INSERT 是一个同步命令。 如果 INSERT 作业仍在运行,您需要打开另一个会话来检查其执行状态。
使用 Broker Load
异步 Broker Load 进程处理与 S3 的连接、提取数据并将数据存储在 StarRocks 中。
此方法支持以下文件格式
- Parquet
- ORC
- CSV
- JSON(从 v3.2.3 开始支持)
Broker Load 的优点
- Broker Load 在后台运行,客户端无需保持连接即可继续作业。
- Broker Load 首选用于长时间运行的作业,默认超时时间为 4 小时。
- 除了 Parquet 和 ORC 文件格式外,Broker Load 还支持 CSV 文件格式和 JSON 文件格式(JSON 文件格式从 v3.2.3 开始支持)。
数据流
- 用户创建一个加载作业。
- 前端 (FE) 创建一个查询计划,并将该计划分发到后端节点 (BE) 或计算节点 (CN)。
- BE 或 CN 从源中提取数据并将数据加载到 StarRocks 中。
典型示例
创建一个表,启动一个加载进程,从 S3 中提取示例数据集 s3://starrocks-examples/user-behavior-10-million-rows.parquet
,并验证数据加载的进度和成功情况。
创建一个数据库和一个表
创建一个数据库并切换到它
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
手动创建表(我们建议该表具有与您要从 AWS S3 加载的 Parquet 文件相同的架构)
CREATE TABLE user_behavior
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);
启动 Broker Load
运行以下命令以启动一个 Broker Load 作业,该作业将数据从示例数据集 s3://starrocks-examples/user-behavior-10-million-rows.parquet
加载到 user_behavior
表中
LOAD LABEL user_behavior
(
DATA INFILE("s3://starrocks-examples/user-behavior-10-million-rows.parquet")
INTO TABLE user_behavior
FORMAT AS "parquet"
)
WITH BROKER
(
"aws.s3.enable_ssl" = "true",
"aws.s3.use_instance_profile" = "false",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
PROPERTIES
(
"timeout" = "72000"
);
注意
在上面的命令中,将您的凭据替换为
AAA
和BBB
。可以使用任何有效的aws.s3.access_key
和aws.s3.secret_key
,因为任何通过 AWS 身份验证的用户都可以读取该对象。
此作业有四个主要部分
LABEL
:一个字符串,用于查询加载作业的状态。LOAD
声明:源 URI、源数据格式和目标表名。BROKER
:源的连接详细信息。PROPERTIES
:超时值和任何其他要应用于加载作业的属性。
有关详细语法和参数说明,请参阅 BROKER LOAD。
检查加载进度
您可以从 StarRocks Information Schema 中的 loads
视图中查询 Broker Load 作业的进度。此功能从 v3.1 开始支持。
SELECT * FROM information_schema.loads WHERE LABEL = 'user_behavior';
有关 loads
视图中提供的字段的信息,请参阅 loads
。
此记录显示 LOADING
状态,进度为 39%。如果您看到类似的内容,请再次运行该命令,直到您看到 FINISHED
状态。
JOB_ID: 10466
LABEL: user_behavior
DATABASE_NAME: mydatabase
STATE: LOADING
PROGRESS: ETL:100%; LOAD:39%
TYPE: BROKER
PRIORITY: NORMAL
SCAN_ROWS: 4620288
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 4620288
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):72000; max_filter_ratio:0.0
CREATE_TIME: 2024-02-28 22:11:36
ETL_START_TIME: 2024-02-28 22:11:41
ETL_FINISH_TIME: 2024-02-28 22:11:41
LOAD_START_TIME: 2024-02-28 22:11:41
LOAD_FINISH_TIME: NULL
JOB_DETAILS: {"All backends":{"2fb97223-b14c-404b-9be1-83aa9b3a7715":[10004]},"FileNumber":1,"FileSize":136901706,"InternalTableLoadBytes":144032784,"InternalTableLoadRows":4620288,"ScanBytes":143969616,"ScanRows":4620288,"TaskNumber":1,"Unfinished backends":{"2fb97223-b14c-404b-9be1-83aa9b3a7715":[10004]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
在确认加载作业已完成后,您可以检查目标表的子集,以查看数据是否已成功加载。 例子
SELECT * from user_behavior LIMIT 3;
返回以下查询结果,表明数据已成功加载
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 34 | 856384 | 1029459 | pv | 2017-11-27 14:43:27 |
| 34 | 5079705 | 1029459 | pv | 2017-11-27 14:44:13 |
| 34 | 4451615 | 1029459 | pv | 2017-11-27 14:45:52 |
+--------+---------+------------+--------------+---------------------+
使用 Pipe
从 v3.2 开始,StarRocks 提供了 Pipe 加载方法,该方法目前仅支持 Parquet 和 ORC 文件格式。
Pipe 的优势
Pipe 非常适合连续数据加载和大规模数据加载
-
微批量的大规模数据加载有助于降低因数据错误而导致的重试成本。
借助 Pipe,StarRocks 能够高效地加载大量数据文件,且总数据量很大。Pipe 会根据文件数量或大小自动拆分文件,将加载作业分解为更小的顺序任务。这种方法确保一个文件中的错误不会影响整个加载作业。Pipe 会记录每个文件的加载状态,使您可以轻松识别和修复包含错误的文件。通过最大限度地减少因数据错误而需要重试的次数,这种方法有助于降低成本。
-
连续数据加载有助于减少人力。
Pipe 可帮助您将新的或更新的数据文件写入特定位置,并不断将这些文件中的新数据加载到 StarRocks 中。在使用指定的
"AUTO_INGEST" = "TRUE"
创建 Pipe 作业后,它将不断监视存储在指定路径中的数据文件的更改,并将数据文件中的新数据或更新的数据自动加载到目标 StarRocks 表中。
此外,Pipe 执行文件唯一性检查以帮助防止重复数据加载。在加载过程中,Pipe 会根据文件名和摘要检查每个数据文件的唯一性。如果具有特定文件名和摘要的文件已被 Pipe 作业处理,则 Pipe 作业将跳过所有后续具有相同文件名和摘要的文件。请注意像 AWS S3 这样的对象存储使用 ETag作为文件摘要。
每个数据文件的加载状态都会被记录并保存到 information_schema.pipe_files
视图中。在删除与该视图关联的 Pipe 作业后,有关在该作业中加载的文件的记录也将被删除。
数据流
Pipe 非常适合连续数据加载和大规模数据加载
-
微批量的大规模数据加载有助于降低因数据错误而导致的重试成本。
借助 Pipe,StarRocks 能够高效地加载大量数据文件,且总数据量很大。Pipe 会根据文件数量或大小自动拆分文件,将加载作业分解为更小的顺序任务。这种方法确保一个文件中的错误不会影响整个加载作业。Pipe 会记录每个文件的加载状态,使您可以轻松识别和修复包含错误的文件。通过最大限度地减少因数据错误而需要重试的次数,这种方法有助于降低成本。
-
连续数据加载有助于减少人力。
Pipe 可帮助您将新的或更新的数据文件写入特定位置,并不断将这些文件中的新数据加载到 StarRocks 中。在使用指定的
"AUTO_INGEST" = "TRUE"
创建 Pipe 作业后,它将不断监视存储在指定路径中的数据文件的更改,并将数据文件中的新数据或更新的数据自动加载到目标 StarRocks 表中。
此外,Pipe 执行文件唯一性检查以帮助防止重复数据加载。在加载过程中,Pipe 会根据文件名和摘要检查每个数据文件的唯一性。如果具有特定文件名和摘要的文件已被 Pipe 作业处理,则 Pipe 作业将跳过所有后续具有相同文件名和摘要的文件。请注意,像 AWS S3 这样的对象存储使用 ETag
作为文件摘要。
每个数据文件的加载状态都会被记录并保存到 information_schema.pipe_files
视图中。在删除与该视图关联的 Pipe 作业后,有关在该作业中加载的文件的记录也将被删除。
Pipe 和 INSERT+FILES() 之间的差异
Pipe 作业根据每个数据文件的大小和行数拆分为一个或多个事务。用户可以在加载过程中查询中间结果。相比之下,INSERT+FILES()
作业作为单个事务处理,用户无法在加载过程中查看数据。
文件加载顺序
对于每个 Pipe 作业,StarRocks 维护一个文件队列,从中获取并加载数据文件作为微批处理。Pipe 不保证数据文件的加载顺序与上传顺序相同。因此,较新的数据可能在较旧的数据之前加载。
典型示例
创建一个数据库和一个表
创建一个数据库并切换到它
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
手动创建表(我们建议该表具有与您要从 AWS S3 加载的 Parquet 文件相同的架构)
CREATE TABLE user_behavior_from_pipe
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);
启动一个 Pipe 作业
运行以下命令以启动一个 Pipe 作业,该作业将数据从示例数据集 s3://starrocks-examples/user-behavior-10-million-rows/
加载到 user_behavior_from_pipe
表中。此 pipe 作业同时使用微批处理和连续加载(如上所述)pipe 特定功能。
本指南中的其他示例加载一个包含 1000 万行的 Parquet 文件。对于 pipe 示例,相同的数据集被拆分为 57 个单独的文件,这些文件都存储在一个 S3 文件夹中。请注意,在下面的 CREATE PIPE
命令中,path
是 S3 文件夹的 URI,而不是提供文件名,而是以 /*
结尾的 URI。通过设置 AUTO_INGEST
并指定文件夹而不是单个文件,pipe 作业将轮询 S3 文件夹中的新文件,并在将文件添加到文件夹时提取它们。
CREATE PIPE user_behavior_pipe
PROPERTIES
(
"AUTO_INGEST" = "TRUE"
)
AS
INSERT INTO user_behavior_from_pipe
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows/*",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);
注意
在上面的命令中,将您的凭据替换为
AAA
和BBB
。可以使用任何有效的aws.s3.access_key
和aws.s3.secret_key
,因为任何通过 AWS 身份验证的用户都可以读取该对象。
此作业有四个主要部分
pipe_name
: Pipe 的名称。Pipe 名称在 Pipe 所属的数据库中必须是唯一的。INSERT_SQL
: 用于将数据从指定的源数据文件加载到目标表的 INSERT INTO SELECT FROM FILES 语句。PROPERTIES
:一组可选参数,用于指定如何执行 pipe。其中包括AUTO_INGEST
、POLL_INTERVAL
、BATCH_SIZE
和BATCH_FILES
。以"key" = "value"
格式指定这些属性。
有关详细的语法和参数说明,请参阅 CREATE PIPE。
检查加载进度
-
通过在 Pipe 作业所属的当前数据库中使用 SHOW PIPES 来查询 Pipe 作业的进度。
SHOW PIPES WHERE NAME = 'user_behavior_pipe' \G
返回以下结果
提示在下面显示的输出中,pipe 处于
RUNNING
状态。pipe 将保持RUNNING
状态,直到您手动停止它。输出还显示了加载的文件数 (57) 以及上次加载文件的时间。*************************** 1. row ***************************
DATABASE_NAME: mydatabase
PIPE_ID: 10476
PIPE_NAME: user_behavior_pipe
STATE: RUNNING
TABLE_NAME: mydatabase.user_behavior_from_pipe
LOAD_STATUS: {"loadedFiles":57,"loadedBytes":295345637,"loadingFiles":0,"lastLoadedTime":"2024-02-28 22:14:19"}
LAST_ERROR: NULL
CREATED_TIME: 2024-02-28 22:13:41
1 row in set (0.02 sec) -
从 StarRocks Information Schema 中的
pipes
视图中查询 Pipe 作业的进度。SELECT * FROM information_schema.pipes WHERE pipe_name = 'user_behavior_replica' \G
返回以下结果
提示本指南中的某些查询以
\G
而不是分号 (;
) 结尾。这会导致 MySQL 客户端以垂直格式输出结果。如果您使用的是 DBeaver 或其他客户端,则可能需要使用分号 (;
) 而不是\G
。*************************** 1. row ***************************
DATABASE_NAME: mydatabase
PIPE_ID: 10217
PIPE_NAME: user_behavior_replica
STATE: RUNNING
TABLE_NAME: mydatabase.user_behavior_replica
LOAD_STATUS: {"loadedFiles":1,"loadedBytes":132251298,"loadingFiles":0,"lastLoadedTime":"2023-11-09 15:35:42"}
LAST_ERROR:
CREATED_TIME: 9891-01-15 07:51:45
1 row in set (0.01 sec)
检查文件状态
您可以从 StarRocks 信息模式中的 pipe_files
视图查询已加载文件的加载状态。
SELECT * FROM information_schema.pipe_files WHERE pipe_name = 'user_behavior_replica' \G
返回以下结果
*************************** 1. row ***************************
DATABASE_NAME: mydatabase
PIPE_ID: 10217
PIPE_NAME: user_behavior_replica
FILE_NAME: s3://starrocks-examples/user-behavior-10-million-rows.parquet
FILE_VERSION: e29daa86b1120fea58ad0d047e671787-8
FILE_SIZE: 132251298
LAST_MODIFIED: 2023-11-06 13:25:17
LOAD_STATE: FINISHED
STAGED_TIME: 2023-11-09 15:35:02
START_LOAD_TIME: 2023-11-09 15:35:03
FINISH_LOAD_TIME: 2023-11-09 15:35:42
ERROR_MSG:
1 row in set (0.03 sec)
管理 Pipe 作业
您可以更改、挂起或恢复、删除或查询您创建的管道,并重试加载特定的数据文件。有关更多信息,请参阅 ALTER PIPE、SUSPEND or RESUME PIPE、DROP PIPE、SHOW PIPES 和 RETRY FILE。