跳到主要内容
版本: 最新版本-3.5

使用 Stream Load 事务接口加载数据

从 v2.4 版本开始,StarRocks 提供 Stream Load 事务接口来实现两阶段提交 (2PC),用于运行从外部系统(如 Apache Flink® 和 Apache Kafka®)加载数据的事务。Stream Load 事务接口有助于提高高并发流式加载的性能。

本主题介绍 Stream Load 事务接口以及如何使用此接口将数据加载到 StarRocks 中。

描述

Stream Load 事务接口支持使用 HTTP 协议兼容的工具或语言来调用 API 操作。本主题使用 curl 作为示例来解释如何使用此接口。此接口提供各种功能,如事务管理、数据写入、事务预提交、事务去重和事务超时管理。

注意

Stream Load 支持 CSV 和 JSON 文件格式。如果要从少量单个大小不超过 10 GB 的文件加载数据,建议使用此方法。Stream Load 不支持 Parquet 文件格式。如果需要从 Parquet 文件加载数据,请使用INSERT+files()

事务管理

Stream Load 事务接口提供以下 API 操作,用于管理事务

  • /api/transaction/begin:启动新事务。

  • /api/transaction/commit:提交当前事务以使数据更改持久化。

  • /api/transaction/rollback:回滚当前事务以中止数据更改。

事务预提交

Stream Load 事务接口提供 /api/transaction/prepare 操作,用于预提交当前事务并使数据更改临时持久化。预提交事务后,您可以继续提交或回滚事务。如果在预提交事务后您的 StarRocks 集群崩溃,您仍然可以在 StarRocks 集群恢复正常后继续提交事务。

注意

预提交事务后,请勿继续使用该事务写入数据。如果继续使用该事务写入数据,您的写入请求将返回错误。

数据写入

Stream Load 事务接口提供 /api/transaction/load 操作,用于写入数据。您可以在一个事务中多次调用此操作。

事务去重

Stream Load 事务接口沿用了 StarRocks 的标签机制。您可以将唯一标签绑定到每个事务,以实现事务的至多一次保证。

事务超时管理

您可以使用每个 FE 的配置文件中的 stream_load_default_timeout_second 参数来指定该 FE 的默认事务超时时间。

创建事务时,可以使用 HTTP 请求头中的 timeout 字段来指定事务的超时时间。

创建事务时,您还可以使用 HTTP 请求头中的 idle_transaction_timeout 字段来指定事务可以保持空闲的超时时间。如果在超时时间内没有写入数据,事务将自动回滚。

优势

Stream Load 事务接口具有以下优势

  • 精确一次语义

    事务分为两个阶段,预提交和提交,这使得跨系统加载数据变得容易。例如,此接口可以保证从 Flink 加载数据的精确一次语义。

  • 提高加载性能

    如果使用程序运行加载作业,Stream Load 事务接口允许您按需合并多个小批量数据,然后通过调用 /api/transaction/commit 操作在一个事务中一次性发送它们。因此,需要加载的数据版本更少,加载性能得到提高。

限制

Stream Load 事务接口具有以下限制

  • 仅支持单数据库单表事务。对多数据库多表事务的支持正在开发中。

  • 仅支持来自一个客户端的并发数据写入。对来自多个客户端的并发数据写入的支持正在开发中。

  • /api/transaction/load 操作可以在一个事务中多次调用。在这种情况下,所有调用的 /api/transaction/load 操作指定的参数设置必须相同。

  • 使用 Stream Load 事务接口加载 CSV 格式的数据时,请确保数据文件中的每条数据记录都以行分隔符结尾。

注意事项

  • 如果您调用的 /api/transaction/begin/api/transaction/load/api/transaction/prepare 操作返回错误,则事务失败并自动回滚。
  • 调用 /api/transaction/begin 操作启动新事务时,必须指定标签。请注意,后续的 /api/transaction/load/api/transaction/prepare/api/transaction/commit 操作必须使用与 /api/transaction/begin 操作相同的标签。
  • 如果使用先前事务的标签调用 /api/transaction/begin 操作启动新事务,则先前事务将失败并回滚。
  • StarRocks 支持的 CSV 格式数据的默认列分隔符和行分隔符分别是 \t\n。如果您的数据文件未使用默认的列分隔符或行分隔符,则在调用 /api/transaction/load 操作时,必须使用 "column_separator: <column_separator>""row_delimiter: <row_delimiter>" 来指定您的数据文件中实际使用的列分隔符或行分隔符。

准备工作

检查权限

您只能作为对这些 StarRocks 表具有 INSERT 权限的用户将数据加载到 StarRocks 表中。如果您没有 INSERT 权限,请按照 GRANT 中提供的说明将 INSERT 权限授予用于连接到 StarRocks 集群的用户。语法是 GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}

检查网络配置

确保您要加载的数据所在的机器可以通过 http_port(默认:8030)和 be_http_port(默认:8040)分别访问 StarRocks 集群的 FE 和 BE 节点。

基本操作

准备示例数据

本主题以 CSV 格式的数据为例。

  1. 在本地文件系统的 /home/disk1/ 路径下,创建一个名为 example1.csv 的 CSV 文件。该文件由三列组成,依次表示用户 ID、用户名和用户分数。

    1,Lily,23
    2,Rose,23
    3,Alice,24
    4,Julia,25
  2. 在您的 StarRocks 数据库 test_db 中,创建一个名为 table1 的主键表。该表由三列组成:idnamescore,其中 id 是主键。

    CREATE TABLE `table1`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 10;

启动事务

语法

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" -H "table:<table_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin

示例

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin

注意

在此示例中,streamload_txn_example1_table1 被指定为事务的标签。

返回结果

  • 如果事务成功启动,将返回以下结果

    {
    "Status": "OK",
    "Message": "",
    "Label": "streamload_txn_example1_table1",
    "TxnId": 9032,
    "BeginTxnTimeMs": 0
    }
  • 如果事务绑定到重复的标签,将返回以下结果

    {
    "Status": "LABEL_ALREADY_EXISTS",
    "ExistingJobStatus": "RUNNING",
    "Message": "Label [streamload_txn_example1_table1] has already been used."
    }
  • 如果发生重复标签以外的错误,将返回以下结果

    {
    "Status": "FAILED",
    "Message": ""
    }

写入数据

语法

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" -H "table:<table_name>" \
-T <file_path> \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load

注意

调用 /api/transaction/load 操作时,必须使用 <file_path> 指定要加载的数据文件的保存路径。

示例

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-T /home/disk1/example1.csv \
-H "column_separator: ," \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load

注意

在此示例中,数据文件 example1.csv 中使用的列分隔符是逗号 (,),而不是 StarRocks 的默认列分隔符 (\t)。因此,在调用 /api/transaction/load 操作时,必须使用 "column_separator: <column_separator>" 指定逗号 (,) 作为列分隔符。

返回结果

  • 如果数据写入成功,将返回以下结果

    {
    "TxnId": 1,
    "Seq": 0,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    }
  • 如果事务被认为是未知的,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "TXN_NOT_EXISTS"
    }
  • 如果事务被认为处于无效状态,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation State Invalid"
    }
  • 如果发生未知事务和无效状态以外的错误,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

预提交事务

语法

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare

示例

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare

返回结果

  • 如果预提交成功,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    "WriteDataTimeMs": 417851
    "CommitAndPublishTimeMs": 1393
    }
  • 如果事务被认为不存在,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • 如果预提交超时,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "commit timeout",
    }
  • 如果发生非存在事务和预提交超时以外的错误,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "publish timeout"
    }

提交事务

语法

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit

示例

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit

返回结果

  • 如果提交成功,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    "WriteDataTimeMs": 417851
    "CommitAndPublishTimeMs": 1393
    }
  • 如果事务已提交,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "Transaction already commited",
    }
  • 如果事务被认为不存在,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • 如果提交超时,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "commit timeout",
    }
  • 如果数据发布超时,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "publish timeout",
    "CommitAndPublishTimeMs": 1393
    }
  • 如果发生非存在事务和超时以外的错误,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

回滚事务

语法

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback

示例

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback

返回结果

  • 如果回滚成功,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": ""
    }
  • 如果事务被认为不存在,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • 如果发生非存在事务以外的错误,将返回以下结果

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

参考

有关 Stream Load 的适用应用场景和支持的数据文件格式以及 Stream Load 的工作方式的信息,请参见通过 Stream Load 从本地文件系统加载

有关创建 Stream Load 作业的语法和参数的信息,请参见STREAM LOAD