从 MinIO 加载数据
StarRocks 提供了以下选项,用于从 MinIO 加载数据
- 使用 INSERT+
FILES()
进行同步加载 - 使用 Broker Load 进行异步加载
这些选项各有优势,以下各节将详细介绍。
在大多数情况下,我们建议您使用 INSERT+FILES()
方法,该方法更易于使用。
但是,INSERT+FILES()
方法目前仅支持 Parquet、ORC 和 CSV 文件格式。因此,如果您需要加载其他文件格式的数据(如 JSON),或者在数据加载期间执行数据更改(如 DELETE),您可以求助于 Broker Load。
准备工作
准备源数据
确保要加载到 StarRocks 中的源数据正确存储在 MinIO Bucket 中。您还可以考虑数据和数据库的位置,因为当您的 Bucket 和 StarRocks 集群位于同一区域时,数据传输成本要低得多。
在本主题中,我们为您提供了一个示例数据集。您可以使用 curl
下载此数据集
curl -O https://starrocks-examples.s3.amazonaws.com/user_behavior_ten_million_rows.parquet
将 Parquet 文件加载到您的 MinIO 系统中,并记下 Bucket 名称。本指南中的示例使用 Bucket 名称 /starrocks
。
检查权限
只有对这些 StarRocks 表具有 INSERT 权限的用户才能将数据加载到 StarRocks 表中。如果您没有 INSERT 权限,请按照 GRANT 中提供的说明,将 INSERT 权限授予用于连接到 StarRocks 集群的用户。语法为 GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}
。
收集连接详细信息
简而言之,要使用 MinIO 访问密钥身份验证,您需要收集以下信息
- 存储数据的 Bucket
- 对象键(对象名称),如果访问 Bucket 中的特定对象
- MinIO 端点
- 用作访问凭据的访问密钥和密钥。
使用 INSERT+FILES()
此方法从 v3.1 开始可用,目前仅支持 Parquet、ORC 和 CSV(从 v3.3.0 开始)文件格式。
INSERT+FILES() 的优点
FILES()
可以读取存储在云存储中的文件,基于您指定的路径相关属性,推断文件中数据的表模式,然后将文件中的数据作为数据行返回。
使用 FILES()
,您可以
- 使用 SELECT 直接从 MinIO 查询数据。
- 使用 CREATE TABLE AS SELECT (CTAS) 创建和加载表。
- 使用 INSERT 将数据加载到现有表中。
典型示例
使用 SELECT 直接从 MinIO 查询
使用 SELECT+FILES()
直接从 MinIO 查询可以在创建表之前很好地预览数据集的内容。例如
- 在不存储数据的情况下预览数据集。
- 查询最小值和最大值,并确定要使用的数据类型。
- 检查
NULL
值。
以下示例查询之前添加到您的 MinIO 系统的示例数据集。
命令的突出显示部分包含您可能需要更改的设置
- 设置
endpoint
和path
以匹配您的 MinIO 系统。 - 如果您的 MinIO 系统使用 SSL,请将
enable_ssl
设置为true
。 - 将您的 MinIO 访问密钥和密钥替换为
AAA
和BBB
。
SELECT * FROM FILES
(
"aws.s3.endpoint" = "http://minio:9000",
"path" = "s3://starrocks/user_behavior_ten_million_rows.parquet",
"aws.s3.enable_ssl" = "false",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
"format" = "parquet",
"aws.s3.use_aws_sdk_default_behavior" = "false",
"aws.s3.use_instance_profile" = "false",
"aws.s3.enable_path_style_access" = "true"
)
LIMIT 3;
系统返回以下查询结果
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 543711 | 829192 | 2355072 | pv | 2017-11-27 08:22:37 |
| 543711 | 2056618 | 3645362 | pv | 2017-11-27 10:16:46 |
| 543711 | 1165492 | 3645362 | pv | 2017-11-27 10:17:00 |
+--------+---------+------------+--------------+---------------------+
3 rows in set (0.41 sec)
请注意,上面返回的列名由 Parquet 文件提供。
使用 CTAS 创建和加载表
这是前一个示例的延续。之前的查询包含在 CREATE TABLE AS SELECT (CTAS) 中,以使用模式推断自动执行表创建。这意味着 StarRocks 将推断表模式,创建您想要的表,然后将数据加载到表中。当将 FILES()
表函数与 Parquet 文件一起使用时,Parquet 格式包含列名,因此创建表时不需要列名和类型。
使用模式推断时,CREATE TABLE 的语法不允许设置副本数,因此请在创建表之前进行设置。以下示例适用于具有单个副本的系统
ADMIN SET FRONTEND CONFIG ('default_replication_num' = '1');
创建一个数据库并切换到它
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
使用 CTAS 创建表并加载先前添加到您的 MinIO 系统的示例数据集的数据。
命令的突出显示部分包含您可能需要更改的设置
- 设置
endpoint
和path
以匹配您的 MinIO 系统。 - 如果您的 MinIO 系统使用 SSL,请将
enable_ssl
设置为true
。 - 将您的 MinIO 访问密钥和密钥替换为
AAA
和BBB
。
CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"aws.s3.endpoint" = "http://minio:9000",
"path" = "s3://starrocks/user_behavior_ten_million_rows.parquet",
"aws.s3.enable_ssl" = "false",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
"format" = "parquet",
"aws.s3.use_aws_sdk_default_behavior" = "false",
"aws.s3.use_instance_profile" = "false",
"aws.s3.enable_path_style_access" = "true"
);
Query OK, 10000000 rows affected (3.17 sec)
{'label':'insert_a5da3ff5-9ee4-11ee-90b0-02420a060004', 'status':'VISIBLE', 'txnId':'17'}
创建表后,您可以使用 DESCRIBE 查看其架构
DESCRIBE user_behavior_inferred;
系统返回以下查询结果
+--------------+------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+------------------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varchar(1048576) | YES | false | NULL | |
| Timestamp | varchar(1048576) | YES | false | NULL | |
+--------------+------------------+------+-------+---------+-------+
查询表以验证数据是否已加载到其中。 例子
SELECT * from user_behavior_inferred LIMIT 3;
返回以下查询结果,表明数据已成功加载
+--------+--------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+--------+------------+--------------+---------------------+
| 58 | 158350 | 2355072 | pv | 2017-11-27 13:06:51 |
| 58 | 158590 | 3194735 | pv | 2017-11-27 02:21:04 |
| 58 | 215073 | 3002561 | pv | 2017-11-30 10:55:42 |
+--------+--------+------------+--------------+---------------------+
使用 INSERT 加载到现有表中
您可能想要自定义要插入的表,例如
- 列数据类型、可为空设置或默认值
- 键类型和列
- 数据分区和分桶
创建最有效的表结构需要了解数据的使用方式和列的内容。本主题不涉及表设计。有关表设计的信息,请参阅表类型。
在此示例中,我们基于对表的查询方式和 Parquet 文件中的数据的了解来创建表。可以通过直接在 MinIO 中查询文件来获得对 Parquet 文件中数据的了解。
- 由于 MinIO 中数据集的查询表明
Timestamp
列包含与datetime
数据类型匹配的数据,因此在以下 DDL 中指定了列类型。 - 通过查询 MinIO 中的数据,您可以发现数据集中没有
NULL
值,因此 DDL 未将任何列设置为可为空。 - 基于对预期查询类型的了解,排序键和分桶列设置为列
UserID
。您的用例可能与此数据不同,因此您可能会决定除了UserID
之外或代替UserID
使用ItemID
作为排序键。
创建一个数据库并切换到它
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
手动创建一个表(我们建议该表具有与您要从 MinIO 加载的 Parquet 文件相同的模式)
CREATE TABLE user_behavior_declared
(
UserID int(11) NOT NULL,
ItemID int(11) NOT NULL,
CategoryID int(11) NOT NULL,
BehaviorType varchar(65533) NOT NULL,
Timestamp datetime NOT NULL
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID)
PROPERTIES
(
'replication_num' = '1'
);
显示架构,以便您可以将其与 FILES()
表函数生成的推断架构进行比较
DESCRIBE user_behavior_declared;
+--------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+----------------+------+-------+---------+-------+
| UserID | int | NO | true | NULL | |
| ItemID | int | NO | false | NULL | |
| CategoryID | int | NO | false | NULL | |
| BehaviorType | varchar(65533) | NO | false | NULL | |
| Timestamp | datetime | NO | false | NULL | |
+--------------+----------------+------+-------+---------+-------+
5 rows in set (0.00 sec)
将您刚刚创建的架构与之前使用 FILES()
表函数推断的架构进行比较。 查看
- 数据类型
- 可为空
- 键字段
为了更好地控制目标表的模式并获得更好的查询性能,我们建议您在生产环境中手动指定表模式。对于时间戳字段,使用 datetime
数据类型比使用 varchar
更有效。
创建表后,您可以使用 INSERT INTO SELECT FROM FILES() 加载它
命令的突出显示部分包含您可能需要更改的设置
- 设置
endpoint
和path
以匹配您的 MinIO 系统。 - 如果您的 MinIO 系统使用 SSL,请将
enable_ssl
设置为true
。 - 将您的 MinIO 访问密钥和密钥替换为
AAA
和BBB
。
INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"aws.s3.endpoint" = "http://minio:9000",
"path" = "s3://starrocks/user_behavior_ten_million_rows.parquet",
"aws.s3.enable_ssl" = "false",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
"format" = "parquet",
"aws.s3.use_aws_sdk_default_behavior" = "false",
"aws.s3.use_instance_profile" = "false",
"aws.s3.enable_path_style_access" = "true"
);
加载完成后,您可以查询表以验证数据是否已加载到其中。 例子
SELECT * from user_behavior_declared LIMIT 3;
返回以下查询结果,表明数据已成功加载
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 58 | 4309692 | 1165503 | pv | 2017-11-25 14:06:52 |
| 58 | 181489 | 1165503 | pv | 2017-11-25 14:07:22 |
| 58 | 3722956 | 1165503 | pv | 2017-11-25 14:09:28 |
+--------+---------+------------+--------------+---------------------+
检查加载进度
您可以从 StarRocks Information Schema 中的 loads
视图查询 INSERT 作业的进度。 从 v3.1 开始支持此功能。 例子
SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;
有关 loads
视图中提供的字段的信息,请参阅 loads
。
如果您提交了多个加载作业,您可以按与作业关联的 LABEL
进行过滤。 例子
SELECT * FROM information_schema.loads WHERE LABEL = 'insert_e3b882f5-7eb3-11ee-ae77-00163e267b60' \G
*************************** 1. row ***************************
JOB_ID: 10243
LABEL: insert_e3b882f5-7eb3-11ee-ae77-00163e267b60
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-11-09 11:56:01
ETL_START_TIME: 2023-11-09 11:56:01
ETL_FINISH_TIME: 2023-11-09 11:56:01
LOAD_START_TIME: 2023-11-09 11:56:01
LOAD_FINISH_TIME: 2023-11-09 11:56:44
JOB_DETAILS: {"All backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[10142]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
INSERT 是一个同步命令。 如果 INSERT 作业仍在运行,您需要打开另一个会话来检查其执行状态。
比较磁盘上的表大小
此查询比较了具有推断模式的表和声明了模式的表。因为推断模式具有可为空的列和时间戳的 varchar,所以数据长度更大
SELECT TABLE_NAME,
TABLE_ROWS,
AVG_ROW_LENGTH,
DATA_LENGTH
FROM information_schema.tables
WHERE TABLE_NAME like 'user_behavior%'\G
*************************** 1. row ***************************
TABLE_NAME: user_behavior_declared
TABLE_ROWS: 10000000
AVG_ROW_LENGTH: 10
DATA_LENGTH: 102562516
*************************** 2. row ***************************
TABLE_NAME: user_behavior_inferred
TABLE_ROWS: 10000000
AVG_ROW_LENGTH: 17
DATA_LENGTH: 176803880
2 rows in set (0.04 sec)
使用 Broker Load
异步 Broker Load 进程处理与 MinIO 的连接、提取数据并将数据存储在 StarRocks 中。
此方法支持以下文件格式
- Parquet
- ORC
- CSV
- JSON(从 v3.2.3 开始支持)
Broker Load 的优点
- Broker Load 在后台运行,客户端无需保持连接即可继续作业。
- Broker Load 首选用于长时间运行的作业,默认超时时间为 4 小时。
- 除了 Parquet 和 ORC 文件格式外,Broker Load 还支持 CSV 文件格式和 JSON 文件格式(JSON 文件格式从 v3.2.3 开始支持)。
数据流
- 用户创建一个加载作业。
- 前端 (FE) 创建一个查询计划,并将该计划分发到后端节点 (BE) 或计算节点 (CN)。
- BE 或 CN 从源中提取数据并将数据加载到 StarRocks 中。
典型示例
创建一个表,启动一个加载进程,该进程提取先前加载到您的 MinIO 系统的示例数据集。
创建一个数据库和一个表
创建一个数据库并切换到它
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
手动创建一个表(我们建议该表具有与您要从 MinIO 加载的 Parquet 文件相同的模式)
CREATE TABLE user_behavior
(
UserID int(11) NOT NULL,
ItemID int(11) NOT NULL,
CategoryID int(11) NOT NULL,
BehaviorType varchar(65533) NOT NULL,
Timestamp datetime NOT NULL
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID)
PROPERTIES
(
'replication_num' = '1'
);
启动 Broker Load
运行以下命令以启动 Broker Load 作业,该作业将数据从示例数据集 user_behavior_ten_million_rows.parquet
加载到 user_behavior
表
命令的突出显示部分包含您可能需要更改的设置
- 设置
endpoint
和DATA INFILE
以匹配您的 MinIO 系统。 - 如果您的 MinIO 系统使用 SSL,请将
enable_ssl
设置为true
。 - 将您的 MinIO 访问密钥和密钥替换为
AAA
和BBB
。
LOAD LABEL UserBehavior
(
DATA INFILE("s3://starrocks/user_behavior_ten_million_rows.parquet")
INTO TABLE user_behavior
)
WITH BROKER
(
"aws.s3.endpoint" = "http://minio:9000",
"aws.s3.enable_ssl" = "false",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
"aws.s3.use_aws_sdk_default_behavior" = "false",
"aws.s3.use_instance_profile" = "false",
"aws.s3.enable_path_style_access" = "true"
)
PROPERTIES
(
"timeout" = "72000"
);
此作业有四个主要部分
LABEL
:一个字符串,用于查询加载作业的状态。LOAD
声明:源 URI、源数据格式和目标表名。BROKER
:源的连接详细信息。PROPERTIES
:超时值和任何其他要应用于加载作业的属性。
有关详细语法和参数说明,请参阅 BROKER LOAD。
检查加载进度
您可以从 StarRocks Information Schema 中的 loads
视图查询 Broker Load 作业的进度。此功能从 v3.1 开始支持。
SELECT * FROM information_schema.loads;
有关 loads
视图中提供的字段的信息,请参阅 loads
。
如果您提交了多个加载作业,您可以按与作业关联的 LABEL
进行过滤。 例子
SELECT * FROM information_schema.loads
WHERE LABEL = 'UserBehavior'\G
*************************** 1. row ***************************
JOB_ID: 10176
LABEL: userbehavior
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: BROKER
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):72000; max_filter_ratio:0.0
CREATE_TIME: 2023-12-19 23:02:41
ETL_START_TIME: 2023-12-19 23:02:44
ETL_FINISH_TIME: 2023-12-19 23:02:44
LOAD_START_TIME: 2023-12-19 23:02:44
LOAD_FINISH_TIME: 2023-12-19 23:02:46
JOB_DETAILS: {"All backends":{"4aeec563-a91e-4c1e-b169-977b660950d1":[10004]},"FileNumber":1,"FileSize":132251298,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":132251298,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"4aeec563-a91e-4c1e-b169-977b660950d1":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
1 row in set (0.02 sec)
在确认加载作业已完成后,您可以检查目标表的子集,以查看数据是否已成功加载。 例子
SELECT * from user_behavior LIMIT 3;
返回以下查询结果,表明数据已成功加载
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 142 | 2869980 | 2939262 | pv | 2017-11-25 03:43:22 |
| 142 | 2522236 | 1669167 | pv | 2017-11-25 15:14:12 |
| 142 | 3031639 | 3607361 | pv | 2017-11-25 15:19:25 |
+--------+---------+------------+--------------+---------------------+