跳到主要内容
版本: 最新版本-3.5

使用 Kafka Connector 加载数据

StarRocks 提供了一个名为 Apache Kafka® 连接器 (StarRocks Connector for Apache Kafka®,简称 Kafka 连接器) 的自研连接器,作为一个 Sink 连接器,它可以持续从 Kafka 消费消息并将它们加载到 StarRocks 中。Kafka 连接器保证至少一次 (at-least-once) 语义。

Kafka 连接器可以与 Kafka Connect 无缝集成,这使得 StarRocks 更好地与 Kafka 生态系统集成。如果您想将实时数据加载到 StarRocks 中,这是一个明智的选择。与 Routine Load 相比,建议在以下场景中使用 Kafka 连接器

  • Routine Load 仅支持加载 CSV、JSON 和 Avro 格式的数据,而 Kafka 连接器可以加载更多格式的数据,例如 Protobuf。只要数据可以使用 Kafka Connect 的转换器转换为 JSON 和 CSV 格式,就可以通过 Kafka 连接器将数据加载到 StarRocks 中。
  • 自定义数据转换,例如 Debezium 格式的 CDC 数据。
  • 从多个 Kafka 主题加载数据。
  • 从 Confluent Cloud 加载数据。
  • 需要更精细地控制加载批量大小、并行度和其他参数,以实现加载速度和资源利用率之间的平衡。

准备工作

版本要求

连接器KafkaStarRocksJava
1.0.43.42.5 及更高版本8
1.0.33.42.5 及更高版本8

设置 Kafka 环境

支持自管理的 Apache Kafka 集群和 Confluent Cloud。

  • 对于自管理的 Apache Kafka 集群,您可以参考 Apache Kafka 快速入门 快速部署 Kafka 集群。 Kafka Connect 已经集成到 Kafka 中。
  • 对于 Confluent Cloud,请确保您拥有 Confluent 帐户并已创建集群。

下载 Kafka 连接器

将 Kafka 连接器提交到 Kafka Connect

网络配置

确保 Kafka 所在的机器可以通过 http_port (默认: 8030) 和 query_port (默认: 9030) 访问 StarRocks 集群的 FE 节点,以及通过 be_http_port (默认: 8040) 访问 BE 节点。

用法

本节以自管理 Kafka 集群为例,说明如何配置 Kafka 连接器和 Kafka Connect,然后运行 Kafka Connect 将数据加载到 StarRocks 中。

准备数据集

假设 Kafka 集群中主题 test 中存在 JSON 格式的数据。

{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}

创建表

根据 JSON 格式数据的键,在 StarRocks 集群的数据库 example_db 中创建表 test_tbl

CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);

配置 Kafka 连接器和 Kafka Connect,然后运行 Kafka Connect 加载数据

以独立模式运行 Kafka Connect

  1. 配置 Kafka 连接器。在 Kafka 安装目录下的 config 目录中,为 Kafka 连接器创建配置文件 connect-StarRocks-sink.properties,并配置以下参数。有关更多参数和描述,请参见 参数

    信息
    • 在此示例中,StarRocks 提供的 Kafka 连接器是一个 Sink 连接器,可以持续从 Kafka 消费数据并将数据加载到 StarRocks 中。
    • 如果源数据是 CDC 数据,例如 Debezium 格式的数据,并且 StarRocks 表是 Primary Key 表,您还需要在 StarRocks 提供的 Kafka 连接器的配置文件 connect-StarRocks-sink.properties配置 transform,以将源数据的更改同步到 Primary Key 表。
    name=starrocks-kafka-connector
    connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
    topics=test
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=false
    # The HTTP URL of the FE in your StarRocks cluster. The default port is 8030.
    starrocks.http.url=192.168.xxx.xxx:8030
    # If the Kafka topic name is different from the StarRocks table name, you need to configure the mapping relationship between them.
    starrocks.topic2table.map=test:test_tbl
    # Enter the StarRocks username.
    starrocks.username=user1
    # Enter the StarRocks password.
    starrocks.password=123456
    starrocks.database.name=example_db
    sink.properties.strip_outer_array=true
  2. 配置并运行 Kafka Connect。

    1. 配置 Kafka Connect。在 config 目录中的配置文件 config/connect-standalone.properties 中,配置以下参数。有关更多参数和描述,请参见 运行 Kafka Connect

      # The addresses of Kafka brokers. Multiple addresses of Kafka brokers need to be separated by commas (,).
      # Note that this example uses PLAINTEXT as the security protocol to access the Kafka cluster. If you are using other security protocol to access the Kafka cluster, you need to configure the relevant information in this file.
      bootstrap.servers=<kafka_broker_ip>:9092
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=false
      # The absolute path of the Kafka connector after extraction. For example:
      plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3
    2. 运行 Kafka Connect。

      CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-starrocks-sink.properties

以分布式模式运行 Kafka Connect

  1. 配置并运行 Kafka Connect。

    1. 配置 Kafka Connect。在 config 目录中的配置文件 config/connect-distributed.properties 中,配置以下参数。有关更多参数和描述,请参考 运行 Kafka Connect

      # The addresses of Kafka brokers. Multiple addresses of Kafka brokers need to be separated by commas (,).
      # Note that this example uses PLAINTEXT as the security protocol to access the Kafka cluster. If you are using other security protocol to access the Kafka cluster, you need to configure the relevant information in this file.
      bootstrap.servers=<kafka_broker_ip>:9092
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=false
      # The absolute path of the Kafka connector after extraction. For example:
      plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3
    2. 运行 Kafka Connect。

      CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-distributed.sh config/connect-distributed.properties
  2. 配置并创建 Kafka 连接器。请注意,在分布式模式下,您需要通过 REST API 配置和创建 Kafka 连接器。有关参数和描述,请参见 参数

    信息
    • 在此示例中,StarRocks 提供的 Kafka 连接器是一个 Sink 连接器,可以持续从 Kafka 消费数据并将数据加载到 StarRocks 中。
    • 如果源数据是 CDC 数据,例如 Debezium 格式的数据,并且 StarRocks 表是 Primary Key 表,您还需要在 StarRocks 提供的 Kafka 连接器的配置文件 connect-StarRocks-sink.properties配置 transform,以将源数据的更改同步到 Primary Key 表。
    curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
    "name":"starrocks-kafka-connector",
    "config":{
    "connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",
    "topics":"test",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"true",
    "value.converter.schemas.enable":"false",
    "starrocks.http.url":"192.168.xxx.xxx:8030",
    "starrocks.topic2table.map":"test:test_tbl",
    "starrocks.username":"user1",
    "starrocks.password":"123456",
    "starrocks.database.name":"example_db",
    "sink.properties.strip_outer_array":"true"
    }
    }'

查询 StarRocks 表

查询目标 StarRocks 表 test_tbl

MySQL [example_db]> select * from test_tbl;

+------+-------------+
| id | city |
+------+-------------+
| 1 | New York |
| 2 | Los Angeles |
| 3 | Chicago |
+------+-------------+
3 rows in set (0.01 sec)

当返回上述结果时,表示数据已成功加载。

参数

name

必需:是
默认值:
描述: 此 Kafka 连接器的名称。在 Kafka Connect 集群中的所有 Kafka 连接器中,它必须是全局唯一的。例如,starrocks-kafka-connector。

connector.class

必需:是
默认值:
描述: 此 Kafka 连接器的 Sink 使用的类。将值设置为 com.starrocks.connector.kafka.StarRocksSinkConnector

topics

必需:
默认值:
描述: 要订阅的一个或多个主题,其中每个主题对应于一个 StarRocks 表。默认情况下,StarRocks 假定主题名称与 StarRocks 表的名称匹配。因此,StarRocks 通过主题名称确定目标 StarRocks 表。请选择填写 topicstopics.regex (如下),但不能同时填写两者。但是,如果 StarRocks 表名与主题名称不同,请使用可选的 starrocks.topic2table.map 参数 (如下) 来指定从主题名称到表名称的映射。

topics.regex

必需:
默认值: 描述: 用于匹配要订阅的一个或多个主题的正则表达式。有关更多描述,请参见 topics。请选择填写 topics.regextopics (如上),但不能同时填写两者。

starrocks.topic2table.map

必需:否
默认值:
描述: 当主题名称与 StarRocks 表名不同时,StarRocks 表名和主题名称的映射。格式为 <topic-1>:<table-1>,<topic-2>:<table-2>,...

starrocks.http.url

必需:是
默认值:
描述: StarRocks 集群中 FE 的 HTTP URL。格式为 <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>,...。多个地址用逗号 (,) 分隔。例如,192.168.xxx.xxx:8030,192.168.xxx.xxx:8030

starrocks.database.name

必需:是
默认值:
描述: StarRocks 数据库的名称。

starrocks.username

必需:是
默认值:
描述: StarRocks 集群帐户的用户名。该用户需要 StarRocks 表的 INSERT 权限。

starrocks.password

必需:是
默认值:
描述:您的 StarRocks 集群帐户的密码。

key.converter

必需:否
默认值: Kafka Connect 集群使用的 Key Converter
描述: 此参数指定 Sink 连接器 (Kafka-connector-starrocks) 的 Key Converter,用于反序列化 Kafka 数据的 Key。默认的 Key Converter 是 Kafka Connect 集群使用的 Key Converter。

value.converter

必需:否
默认值: Kafka Connect 集群使用的 Value Converter
描述: 此参数指定 Sink 连接器 (Kafka-connector-starrocks) 的 Value Converter,用于反序列化 Kafka 数据的 Value。默认的 Value Converter 是 Kafka Connect 集群使用的 Value Converter。

key.converter.schema.registry.url

必需:否
默认值:
描述: Key Converter 的 Schema Registry URL。

value.converter.schema.registry.url

必需:否
默认值:
描述: Value Converter 的 Schema Registry URL。

tasks.max

必需:否
默认值: 1
描述: Kafka 连接器可以创建的任务线程数的上限,通常与 Kafka Connect 集群中 Worker 节点上的 CPU 核心数相同。您可以调整此参数来控制加载性能。

bufferflush.maxbytes

必需:否
默认值:94371840(90M)
描述: 在一次发送到 StarRocks 之前可以在内存中累积的最大数据量。最大值范围为 64 MB 到 10 GB。请记住,Stream Load SDK 缓冲区可能会创建多个 Stream Load 作业来缓冲数据。因此,此处提到的阈值是指总数据大小。

bufferflush.intervalms

必需:否
默认值: 1000
描述: 发送一批数据的间隔,它控制加载延迟。范围: [1000, 3600000]。

connect.timeoutms

必需:否
默认值: 1000
描述: 连接到 HTTP URL 的超时时间。范围: [100, 60000]。

sink.properties.*

必需:
默认值:
描述: 用于控制加载行为的 Stream Load 参数。例如,参数 sink.properties.format 指定 Stream Load 使用的格式,例如 CSV 或 JSON。有关支持的参数及其描述的列表,请参见 STREAM LOAD

sink.properties.format

必需:否
默认值: json
描述: Stream Load 使用的格式。Kafka 连接器会在将每批数据发送到 StarRocks 之前将其转换为格式。有效值: csvjson。有关更多信息,请参见 CSV 参数JSON 参数

sink.properties.partial_update

必需:否
默认值FALSE
描述:是否使用部分更新。有效值:TRUEFALSE。默认值:FALSE,表示禁用此功能。

sink.properties.partial_update_mode

必需:否
默认值row
描述:指定部分更新的模式。有效值:rowcolumn

  • row(默认)表示行模式下的部分更新,更适合具有许多列和小批量的实时更新。
  • column 表示列模式下的部分更新,更适合于具有少量列和许多行的批量更新。在这种情况下,启用列模式可以提供更快的更新速度。例如,在一个具有 100 列的表中,如果仅更新所有行的 10 列 (占总数的 10%),则列模式的更新速度快 10 倍。

使用说明

刷新策略

Kafka 连接器会将数据缓存在内存中,并通过 Stream Load 将它们批量刷新到 StarRocks。满足以下任何条件时,将触发刷新

  • 缓冲行的字节数达到限制 bufferflush.maxbytes
  • 自上次刷新以来经过的时间达到限制 bufferflush.intervalms
  • 连接器尝试提交任务偏移量的间隔已达到。该间隔由 Kafka Connect 配置 offset.flush.interval.ms 控制,默认值为 60000

为了降低数据延迟,请调整 Kafka 连接器设置中的这些配置。但是,更频繁的刷新会增加 CPU 和 I/O 使用率。

限制

  • 不支持将来自 Kafka 主题的单个消息展平为多个数据行并加载到 StarRocks 中。
  • StarRocks 提供的 Kafka 连接器的 Sink 保证至少一次 (at-least-once) 语义。

最佳实践

加载 Debezium 格式的 CDC 数据

Debezium 是一种流行的变更数据捕获 (CDC) 工具,它支持监视各种数据库系统中的数据更改,并将这些更改流式传输到 Kafka。以下示例演示如何配置和使用 Kafka 连接器将 PostgreSQL 更改写入 StarRocks 中的 Primary Key 表

步骤 1: 安装并启动 Kafka

注意

如果您有自己的 Kafka 环境,则可以跳过此步骤。

  1. 下载 来自官方网站的最新 Kafka 版本并解压该软件包。

    tar -xzf kafka_2.13-3.7.0.tgz
    cd kafka_2.13-3.7.0
  2. 启动 Kafka 环境。

    生成 Kafka 集群 UUID。

    KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

    格式化日志目录。

    bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

    启动 Kafka 服务器。

    bin/kafka-server-start.sh config/kraft/server.properties

步骤 2: 配置 PostgreSQL

  1. 确保 PostgreSQL 用户被授予 REPLICATION 权限。

  2. 调整 PostgreSQL 配置。

    postgresql.conf 中将 wal_level 设置为 logical

    wal_level = logical

    重新启动 PostgreSQL 服务器以应用更改。

    pg_ctl restart
  3. 准备数据集。

    创建一个表并插入测试数据。

    CREATE TABLE customers (
    id int primary key ,
    first_name varchar(65533) NULL,
    last_name varchar(65533) NULL ,
    email varchar(65533) NULL
    );

    INSERT INTO customers VALUES (1,'a','a','a@a.com');
  4. 验证 Kafka 中的 CDC 日志消息。

    {
    "schema": {
    "type": "struct",
    "fields": [
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": true,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "test.public.customers.Value",
    "field": "before"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": true,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": true,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "test.public.customers.Value",
    "field": "after"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "string",
    "optional": false,
    "field": "version"
    },
    {
    "type": "string",
    "optional": false,
    "field": "connector"
    },
    {
    "type": "string",
    "optional": false,
    "field": "name"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "ts_ms"
    },
    {
    "type": "string",
    "optional": true,
    "name": "io.debezium.data.Enum",
    "version": 1,
    "parameters": {
    "allowed": "true,last,false,incremental"
    },
    "default": "false",
    "field": "snapshot"
    },
    {
    "type": "string",
    "optional": false,
    "field": "db"
    },
    {
    "type": "string",
    "optional": true,
    "field": "sequence"
    },
    {
    "type": "string",
    "optional": false,
    "field": "schema"
    },
    {
    "type": "string",
    "optional": false,
    "field": "table"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "txId"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "lsn"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "xmin"
    }
    ],
    "optional": false,
    "name": "io.debezium.connector.postgresql.Source",
    "field": "source"
    },
    {
    "type": "string",
    "optional": false,
    "field": "op"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "ts_ms"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "string",
    "optional": false,
    "field": "id"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "total_order"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "data_collection_order"
    }
    ],
    "optional": true,
    "name": "event.block",
    "version": 1,
    "field": "transaction"
    }
    ],
    "optional": false,
    "name": "test.public.customers.Envelope",
    "version": 1
    },
    "payload": {
    "before": null,
    "after": {
    "id": 1,
    "first_name": "a",
    "last_name": "a",
    "email": "a@a.com"
    },
    "source": {
    "version": "2.5.3.Final",
    "connector": "postgresql",
    "name": "test",
    "ts_ms": 1714283798721,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"22910216\",\"22910504\"]",
    "schema": "public",
    "table": "customers",
    "txId": 756,
    "lsn": 22910504,
    "xmin": null
    },
    "op": "c",
    "ts_ms": 1714283798790,
    "transaction": null
    }
    }

步骤 3: 配置 StarRocks

在 StarRocks 中创建一个 Primary Key 表,其架构与 PostgreSQL 中的源表相同。

CREATE TABLE `customers` (
`id` int(11) COMMENT "",
`first_name` varchar(65533) NULL COMMENT "",
`last_name` varchar(65533) NULL COMMENT "",
`email` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY hash(id) buckets 1
PROPERTIES (
"bucket_size" = "4294967296",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"fast_schema_evolution" = "true"
);

步骤 4: 安装连接器

  1. 下载连接器并在 plugins 目录中解压这些软件包。

    mkdir plugins
    tar -zxvf debezium-debezium-connector-postgresql-2.5.3.zip -C plugins
    tar -zxvf starrocks-kafka-connector-1.0.3.tar.gz -C plugins

    此目录是 config/connect-standalone.properties 中配置项 plugin.path 的值。

    plugin.path=/path/to/kafka_2.13-3.7.0/plugins
  2. pg-source.properties 中配置 PostgreSQL 源连接器。

    {
    "name": "inventory-connector",
    "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "",
    "database.dbname" : "postgres",
    "topic.prefix": "test"
    }
    }
  3. sr-sink.properties 中配置 StarRocks Sink 连接器。

    {
    "name": "starrocks-kafka-connector",
    "config": {
    "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
    "tasks.max": "1",
    "topics": "test.public.customers",
    "starrocks.http.url": "172.26.195.69:28030",
    "starrocks.database.name": "test",
    "starrocks.username": "root",
    "starrocks.password": "StarRocks@123",
    "sink.properties.strip_outer_array": "true",
    "connect.timeoutms": "3000",
    "starrocks.topic2table.map": "test.public.customers:customers",
    "transforms": "addfield,unwrap",
    "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite"
    }
    }

    注意

    • 如果 StarRocks 表不是 Primary Key 表,则无需指定 addfield 转换。
    • unwrap 转换由 Debezium 提供,用于基于操作类型解包 Debezium 的复杂数据结构。有关更多信息,请参见 New Record State Extraction
  4. 配置 Kafka Connect。

    在 Kafka Connect 配置文件 config/connect-standalone.properties 中配置以下配置项。

    # The addresses of Kafka brokers. Multiple addresses of Kafka brokers need to be separated by commas (,).
    # Note that this example uses PLAINTEXT as the security protocol to access the Kafka cluster.
    # If you use other security protocol to access the Kafka cluster, configure the relevant information in this part.

    bootstrap.servers=<kafka_broker_ip>:9092
    offset.storage.file.filename=/tmp/connect.offsets
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=false

    # The absolute path of the starrocks-kafka-connector after extraction. For example:
    plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3

    # Parameters that control the flush policy. For more information, see the Usage Note section.
    offset.flush.interval.ms=10000
    bufferflush.maxbytes = xxx
    bufferflush.intervalms = xxx

    有关更多参数的描述,请参见 运行 Kafka Connect

步骤 5: 以独立模式启动 Kafka Connect

以独立模式运行 Kafka Connect 以启动连接器。

bin/connect-standalone.sh config/connect-standalone.properties config/pg-source.properties config/sr-sink.properties 

步骤 6: 验证数据摄取

测试以下操作,并确保数据已正确摄取到 StarRocks 中。

INSERT
  • 在 PostgreSQL 中
postgres=# insert into customers values (2,'b','b','b@b.com');
INSERT 0 1
postgres=# select * from customers;
id | first_name | last_name | email
----+------------+-----------+---------
1 | a | a | a@a.com
2 | b | b | b@b.com
(2 rows)
  • 在 StarRocks 中
MySQL [test]> select * from customers;
+------+------------+-----------+---------+
| id | first_name | last_name | email |
+------+------------+-----------+---------+
| 1 | a | a | a@a.com |
| 2 | b | b | b@b.com |
+------+------------+-----------+---------+
2 rows in set (0.01 sec)
UPDATE
  • 在 PostgreSQL 中
postgres=# update customers set email='c@c.com';
UPDATE 2
postgres=# select * from customers;
id | first_name | last_name | email
----+------------+-----------+---------
1 | a | a | c@c.com
2 | b | b | c@c.com
(2 rows)
  • 在 StarRocks 中
MySQL [test]> select * from customers;
+------+------------+-----------+---------+
| id | first_name | last_name | email |
+------+------------+-----------+---------+
| 1 | a | a | c@c.com |
| 2 | b | b | c@c.com |
+------+------------+-----------+---------+
2 rows in set (0.00 sec)
DELETE
  • 在 PostgreSQL 中
postgres=# delete from customers where id=1;
DELETE 1
postgres=# select * from customers;
id | first_name | last_name | email
----+------------+-----------+---------
2 | b | b | c@c.com
(1 row)
  • 在 StarRocks 中
MySQL [test]> select * from customers;
+------+------------+-----------+---------+
| id | first_name | last_name | email |
+------+------------+-----------+---------+
| 2 | b | b | c@c.com |
+------+------------+-----------+---------+
1 row in set (0.00 sec)