从 GCS 加载数据
StarRocks 提供了以下从 GCS 加载数据的选项
- 使用 INSERT+
FILES()
进行同步加载 - 使用 Broker Load 进行异步加载
这些选项各有优势,以下各节将详细介绍。
在大多数情况下,我们建议您使用 INSERT+FILES()
方法,该方法更易于使用。
但是,INSERT+FILES()
方法目前仅支持 Parquet、ORC 和 CSV 文件格式。因此,如果您需要加载其他文件格式(如 JSON)的数据,或者在数据加载期间执行数据更改(如 DELETE),您可以采用 Broker Load。
准备工作
准备源数据
请确保您要加载到 StarRocks 中的源数据已正确存储在 GCS Bucket 中。您还可以考虑数据和数据库的所在位置,因为当您的 Bucket 和 StarRocks 集群位于同一区域时,数据传输成本会低得多。
在本主题中,我们为您提供 GCS Bucket 中的示例数据集 gs://starrocks-samples/user_behavior_ten_million_rows.parquet
。您可以使用任何有效的凭据访问该数据集,因为任何 GCP 用户都可以读取该对象。
检查权限
只有对 StarRocks 表具有 INSERT 权限的用户才能将数据加载到 StarRocks 表中。如果您没有 INSERT 权限,请按照GRANT中提供的说明将 INSERT 权限授予用于连接到 StarRocks 集群的用户。语法为 GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}
。
收集身份验证详细信息
本主题中的示例使用基于服务帐户的身份验证。要练习基于 IAM 用户的身份验证,您需要收集有关以下 GCS 资源的信息
- 存储数据的 GCS Bucket。
- 如果访问 Bucket 中的特定对象,则为 GCS 对象键(对象名称)。请注意,如果您的 GCS 对象存储在子文件夹中,则对象键可以包含前缀。
- GCS Bucket 所属的 GCS 区域。
- 您的 Google Cloud 服务帐户的
private_ key_id
、private_key
和client_email
有关所有可用的身份验证方法的信息,请参见通过身份验证连接到 Google Cloud Storage。
使用 INSERT+FILES()
此方法从 v3.2 开始可用,目前仅支持 Parquet、ORC 和 CSV(从 v3.3.0 开始)文件格式。
INSERT+FILES() 的优点
FILES()
可以读取存储在云存储中的文件,基于您指定的路径相关属性,推断文件中数据的表模式,然后将文件中的数据作为数据行返回。
使用 FILES()
,您可以
- 使用 SELECT 直接从 GCS 查询数据。
- 使用 CREATE TABLE AS SELECT (CTAS) 创建和加载表。
- 使用 INSERT 将数据加载到现有表中。
典型示例
使用 SELECT 直接从 GCS 查询
在创建表之前,使用 SELECT+FILES()
直接从 GCS 查询可以很好地预览数据集的内容。例如
- 在不存储数据的情况下预览数据集。
- 查询最小值和最大值,并确定要使用的数据类型。
- 检查
NULL
值。
以下示例查询示例数据集 gs://starrocks-samples/user_behavior_ten_million_rows.parquet
SELECT * FROM FILES
(
"path" = "gs://starrocks-samples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"gcp.gcs.service_account_email" = "sampledatareader@xxxxx-xxxxxx-000000.iam.gserviceaccount.com",
"gcp.gcs.service_account_private_key_id" = "baaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gcp.gcs.service_account_private_key" = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----"
)
LIMIT 3;
注意
将上述命令中的凭据替换为您自己的凭据。可以使用任何有效的服务帐户电子邮件、密钥和密钥,因为任何 GCP 经过身份验证的用户都可以读取该对象。
系统返回类似于以下的查询结果
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 543711 | 829192 | 2355072 | pv | 2017-11-27 08:22:37 |
| 543711 | 2056618 | 3645362 | pv | 2017-11-27 10:16:46 |
| 543711 | 1165492 | 3645362 | pv | 2017-11-27 10:17:00 |
+--------+---------+------------+--------------+---------------------+
注意
请注意,上面返回的列名由 Parquet 文件提供。
使用 CTAS 创建和加载表
这是前一个示例的延续。之前的查询包含在 CREATE TABLE AS SELECT (CTAS) 中,以使用模式推断自动创建表。这意味着 StarRocks 将推断表模式,创建您想要的表,然后将数据加载到该表中。在使用 FILES()
表函数和 Parquet 文件时,不需要创建表时指定列名和类型,因为 Parquet 格式包含列名。
注意
使用模式推断时,CREATE TABLE 的语法不允许设置副本数。如果您使用的是 StarRocks Shared-Nothing 集群,请在创建表之前设置副本数。以下示例适用于具有三个副本的系统
ADMIN SET FRONTEND CONFIG ('default_replication_num' = "3");
创建一个数据库并切换到它
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
使用 CTAS 创建一个表,并将示例数据集 gs://starrocks-samples/user_behavior_ten_million_rows.parquet
的数据加载到该表中
CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"path" = "gs://starrocks-samples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"gcp.gcs.service_account_email" = "sampledatareader@xxxxx-xxxxxx-000000.iam.gserviceaccount.com",
"gcp.gcs.service_account_private_key_id" = "baaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gcp.gcs.service_account_private_key" = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----"
);
注意
将上述命令中的凭据替换为您自己的凭据。可以使用任何有效的服务帐户电子邮件、密钥和密钥,因为任何 GCP 经过身份验证的用户都可以读取该对象。
创建表后,您可以使用 DESCRIBE 查看其架构
DESCRIBE user_behavior_inferred;
系统返回类似于以下的查询结果
+--------------+-----------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+-----------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varbinary | YES | false | NULL | |
| Timestamp | varbinary | YES | false | NULL | |
+--------------+-----------+------+-------+---------+-------+
查询表以验证数据是否已加载到其中。 例子
SELECT * from user_behavior_inferred LIMIT 3;
返回以下查询结果,表明数据已成功加载
+--------+--------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+--------+------------+--------------+---------------------+
| 84 | 162325 | 2939262 | pv | 2017-12-02 05:41:41 |
| 84 | 232622 | 4148053 | pv | 2017-11-27 04:36:10 |
| 84 | 595303 | 903809 | pv | 2017-11-26 08:03:59 |
+--------+--------+------------+--------------+---------------------+
使用 INSERT 加载到现有表
您可能想要自定义要插入的表,例如
- 列数据类型、可为空设置或默认值
- 键类型和列
- 数据分区和分桶
注意
创建最高效的表结构需要了解数据的使用方式和列的内容。本主题不涉及表设计。有关表设计的信息,请参见表类型。
在本示例中,我们基于对表查询方式和 Parquet 文件中数据的了解来创建表。对 Parquet 文件中数据的了解可以通过直接在 GCS 中查询文件来获得。
- 由于在 GCS 中对数据集的查询表明
Timestamp
列包含与 VARBINARY 数据类型匹配的数据,因此在以下 DDL 中指定了列类型。 - 通过查询 GCS 中的数据,您可以发现数据集中没有
NULL
值,因此 DDL 未将任何列设置为可为空。 - 基于对预期查询类型的了解,排序键和分桶列设置为列
UserID
。对于此数据,您的用例可能有所不同,因此您可能会决定将ItemID
添加到排序键或代替UserID
用于排序键。
创建一个数据库并切换到它
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
手动创建一个表(我们建议该表具有与要从 GCS 加载的 Parquet 文件相同的模式)
CREATE TABLE user_behavior_declared
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp varbinary
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);
显示架构,以便您可以将其与 FILES()
表函数生成的推断架构进行比较
DESCRIBE user_behavior_declared;
+--------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+----------------+------+-------+---------+-------+
| UserID | int | NO | true | NULL | |
| ItemID | int | NO | false | NULL | |
| CategoryID | int | NO | false | NULL | |
| BehaviorType | varchar(65533) | NO | false | NULL | |
| Timestamp | varbinary | NO | false | NULL | |
+--------------+----------------+------+-------+---------+-------+
5 rows in set (0.00 sec)
将您刚刚创建的架构与之前使用 FILES()
表函数推断的架构进行比较。 查看
- 数据类型
- 可为空
- 键字段
为了更好地控制目标表的架构并获得更好的查询性能,我们建议您在生产环境中手动指定表架构。
创建表后,您可以使用 INSERT INTO SELECT FROM FILES() 加载它
INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"path" = "gs://starrocks-samples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"gcp.gcs.service_account_email" = "sampledatareader@xxxxx-xxxxxx-000000.iam.gserviceaccount.com",
"gcp.gcs.service_account_private_key_id" = "baaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gcp.gcs.service_account_private_key" = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----"
);
注意
将上述命令中的凭据替换为您自己的凭据。可以使用任何有效的服务帐户电子邮件、密钥和密钥,因为任何 GCP 经过身份验证的用户都可以读取该对象。
加载完成后,您可以查询表以验证数据是否已加载到其中。 例子
SELECT * from user_behavior_declared LIMIT 3;
系统返回类似于以下的查询结果,表明数据已成功加载
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 142 | 2869980 | 2939262 | pv | 2017-11-25 03:43:22 |
| 142 | 2522236 | 1669167 | pv | 2017-11-25 15:14:12 |
| 142 | 3031639 | 3607361 | pv | 2017-11-25 15:19:25 |
+--------+---------+------------+--------------+---------------------+
检查加载进度
您可以从 StarRocks Information Schema 中的 loads
视图查询 INSERT 作业的进度。 从 v3.1 开始支持此功能。 例子
SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;
有关 loads
视图中提供的字段的信息,请参阅 loads
。
如果您提交了多个加载作业,您可以按与作业关联的 LABEL
进行过滤。 例子
SELECT * FROM information_schema.loads WHERE LABEL = 'insert_f3fc2298-a553-11ee-92f4-00163e0842bd' \G
*************************** 1. row ***************************
JOB_ID: 10193
LABEL: insert_f3fc2298-a553-11ee-92f4-00163e0842bd
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-12-28 15:37:38
ETL_START_TIME: 2023-12-28 15:37:38
ETL_FINISH_TIME: 2023-12-28 15:37:38
LOAD_START_TIME: 2023-12-28 15:37:38
LOAD_FINISH_TIME: 2023-12-28 15:39:35
JOB_DETAILS: {"All backends":{"f3fc2298-a553-11ee-92f4-00163e0842bd":[10120]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":581730322,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"f3fc2298-a553-11ee-92f4-00163e0842bd":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
注意
INSERT 是一个同步命令。 如果 INSERT 作业仍在运行,您需要打开另一个会话来检查其执行状态。
使用 Broker Load
异步 Broker Load 进程处理与 GCS 的连接、提取数据并将数据存储在 StarRocks 中。
此方法支持以下文件格式
- Parquet
- ORC
- CSV
- JSON(从 v3.2.3 开始支持)
Broker Load 的优点
- Broker Load 在后台运行,客户端无需保持连接即可继续作业。
- 对于长时间运行的作业,首选 Broker Load,默认超时时间为 4 小时。
- 除了 Parquet 和 ORC 文件格式外,Broker Load 还支持 CSV 文件格式和 JSON 文件格式(JSON 文件格式从 v3.2.3 开始支持)。
数据流
- 用户创建一个加载作业。
- 前端 (FE) 创建一个查询计划,并将该计划分发到后端节点 (BE) 或计算节点 (CN)。
- BE 或 CN 从源中提取数据并将数据加载到 StarRocks 中。
典型示例
创建一个表,启动一个从 GCS 提取示例数据集 gs://starrocks-samples/user_behavior_ten_million_rows.parquet
的加载过程,并验证数据加载的进度和成功情况。
创建一个数据库和一个表
创建一个数据库并切换到它
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
手动创建一个表(我们建议该表具有与您要从 GCS 加载的 Parquet 文件相同的模式)
CREATE TABLE user_behavior
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp varbinary
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);
启动 Broker Load
运行以下命令以启动 Broker Load 作业,该作业将数据从示例数据集 gs://starrocks-samples/user_behavior_ten_million_rows.parquet
加载到 user_behavior
表
LOAD LABEL user_behavior
(
DATA INFILE("gs://starrocks-samples/user_behavior_ten_million_rows.parquet")
INTO TABLE user_behavior
FORMAT AS "parquet"
)
WITH BROKER
(
"gcp.gcs.service_account_email" = "sampledatareader@xxxxx-xxxxxx-000000.iam.gserviceaccount.com",
"gcp.gcs.service_account_private_key_id" = "baaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"gcp.gcs.service_account_private_key" = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----"
)
PROPERTIES
(
"timeout" = "72000"
);
注意
将上述命令中的凭据替换为您自己的凭据。可以使用任何有效的服务帐户电子邮件、密钥和密钥,因为任何 GCP 经过身份验证的用户都可以读取该对象。
此作业有四个主要部分
LABEL
:一个字符串,用于查询加载作业的状态。LOAD
声明:源 URI、源数据格式和目标表名。BROKER
:源的连接详细信息。PROPERTIES
:超时值和任何其他要应用于加载作业的属性。
有关详细语法和参数说明,请参阅 BROKER LOAD。
检查加载进度
您可以从 StarRocks Information Schema 中的 loads
视图查询 INSERT 作业的进度。此功能从 v3.1 开始支持。
SELECT * FROM information_schema.loads;
有关 loads
视图中提供的字段的信息,请参阅 loads
。
如果您提交了多个加载作业,您可以按与作业关联的 LABEL
进行过滤。 例子
SELECT * FROM information_schema.loads WHERE LABEL = 'user_behavior';
在下面的输出中,user_behavior
加载作业有两个条目
- 第一个记录显示状态为
CANCELLED
。滚动到ERROR_MSG
,您可以看到该作业由于listPath failed
而失败。 - 第二个记录显示状态为
FINISHED
,这意味着该作业已成功。
JOB_ID|LABEL |DATABASE_NAME|STATE |PROGRESS |TYPE |PRIORITY|SCAN_ROWS|FILTERED_ROWS|UNSELECTED_ROWS|SINK_ROWS|ETL_INFO|TASK_INFO |CREATE_TIME |ETL_START_TIME |ETL_FINISH_TIME |LOAD_START_TIME |LOAD_FINISH_TIME |JOB_DETAILS |ERROR_MSG |TRACKING_URL|TRACKING_SQL|REJECTED_RECORD_PATH|
------+-------------------------------------------+-------------+---------+-------------------+------+--------+---------+-------------+---------------+---------+--------+----------------------------------------------------+-------------------+-------------------+-------------------+-------------------+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------+------------+------------+--------------------+
10121|user_behavior |mydatabase |CANCELLED|ETL:N/A; LOAD:N/A |BROKER|NORMAL | 0| 0| 0| 0| |resource:N/A; timeout(s):72000; max_filter_ratio:0.0|2023-08-10 14:59:30| | | |2023-08-10 14:59:34|{"All backends":{},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":0,"InternalTableLoadRows":0,"ScanBytes":0,"ScanRows":0,"TaskNumber":0,"Unfinished backends":{}} |type:ETL_RUN_FAIL; msg:listPath failed| | | |
10106|user_behavior |mydatabase |FINISHED |ETL:100%; LOAD:100%|BROKER|NORMAL | 86953525| 0| 0| 86953525| |resource:N/A; timeout(s):72000; max_filter_ratio:0.0|2023-08-10 14:50:15|2023-08-10 14:50:19|2023-08-10 14:50:19|2023-08-10 14:50:19|2023-08-10 14:55:10|{"All backends":{"a5fe5e1d-d7d0-4826-ba99-c7348f9a5f2f":[10004]},"FileNumber":1,"FileSize":1225637388,"InternalTableLoadBytes":2710603082,"InternalTableLoadRows":86953525,"ScanBytes":1225637388,"ScanRows":86953525,"TaskNumber":1,"Unfinished backends":{"a5| | | | |
在确认加载作业已完成后,您可以检查目标表的子集,以查看数据是否已成功加载。 例子
SELECT * from user_behavior LIMIT 3;
系统返回类似于以下的查询结果,表明数据已成功加载
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 142 | 2869980 | 2939262 | pv | 2017-11-25 03:43:22 |
| 142 | 2522236 | 1669167 | pv | 2017-11-25 15:14:12 |
| 142 | 3031639 | 3607361 | pv | 2017-11-25 15:19:25 |
+--------+---------+------------+--------------+---------------------+