跳到主要内容
版本: 最新版本-3.5

Colocate Join

对于 Shuffle Join 和 Broadcast Join,如果满足 Join 条件,则两个 Join 表的数据行将合并到单个节点以完成 Join。这两种 Join 方法都无法避免节点之间的数据网络传输造成的延迟或开销。

核心思想是保持同一 Colocation Group 中表的 Bucketing Key、副本数量和副本放置位置一致。如果 Join 列是 Bucketing Key,则计算节点只需要进行本地 Join,而无需从其他节点获取数据。Colocate Join 支持等值 Join。

本文档介绍了 Colocate Join 的原理、实现、用法和注意事项。

术语

  • Colocation Group (CG):一个 CG 将包含一个或多个表。CG 中的表具有相同的分桶和副本放置方式,并使用 Colocation Group Schema 进行描述。
  • Colocation Group Schema (CGS):CGS 包含 CG 的 Bucketing Key、Bucket 数量和副本数量。

原理

Colocate Join 的目的是形成一个 CG,其中包含一组具有相同 CGS 的表,并确保这些表的相应 Bucket 副本将落在同一组 BE 节点上。当 CG 中的表在 Bucket 列上执行 Join 操作时,可以直接 Join 本地数据,从而节省了在节点之间传输数据的时间。

Bucket Seq 通过 hash(key) mod buckets 获得。假设一个表有 8 个 Bucket,则有 [0, 1, 2, 3, 4, 5, 6, 7] 8 个 Bucket,每个 Bucket 有一个或多个子表,子表的数量取决于分区的数量。如果是多分区的表,将有多个 Tablet。

为了具有相同的数据分布,同一 CG 中的表必须遵守以下规定。

  1. 同一 CG 中的表必须具有相同的 Bucketing Key(类型、数量、顺序)和相同数量的 Bucket,以便可以逐个分发和控制多个表的数据片。Bucketing Key 是在表创建语句 DISTRIBUTED BY HASH(col1, col2, ...) 中指定的列。Bucketing Key 决定了哪些列的数据将被哈希到不同的 Bucket Seq 中。同一 CG 中表的 Bucketing Key 名称可以不同。创建语句中的 Bucketing 列可以不同,但 DISTRIBUTED BY HASH(col1, col2, ...) 中对应数据类型的顺序应完全相同。
  2. 同一 CG 中的表必须具有相同数量的分区副本。否则,可能会发生 Tablet 副本在同一 BE 的分区中没有相应副本的情况。
  3. 同一 CG 中的表可以具有不同数量的分区和不同的分区键。

创建表时,CG 由表 PROPERTIES 中的属性 "colocate_with" = "group_name" 指定。如果 CG 不存在,则表示该表是 CG 的第一个表,称为 Parent Table。Parent Table 的数据分布(拆分 Bucket Key 的类型、数量和顺序、副本数量和拆分 Bucket 数量)决定了 CGS。如果 CG 存在,则检查表的数据分布是否与 CGS 一致。

同一 CG 中表的副本放置满足以下条件

  1. 所有表的 Bucket Seq 与 BE 节点之间的映射与 Parent Table 的映射相同。
  2. Parent Table 中所有分区的 Bucket Seq 与 BE 节点之间的映射与第一个分区的映射相同。
  3. Parent Table 的第一个分区的 Bucket Seq 与 BE 节点之间的映射使用本机 Round Robin 算法确定。

一致的数据分布和映射保证了 Bucketing Key 所取的相同值的Data rows落在同一 BE 上。因此,当使用 Bucketing Key 连接列时,只需要本地 Join。

用法

创建表

创建表时,您可以在 PROPERTIES 中指定属性 "colocate_with" = "group_name",以指示该表是 Colocate Join 表并且属于指定的 Colocation Group。

注意

从 2.5.4 版本开始,可以在来自不同数据库的表上执行 Colocate Join。您只需要在创建表时指定相同的 colocate_with 属性即可。

例如

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "group1"
);

如果指定的 Group 不存在,StarRocks 会自动创建一个仅包含当前表的 Group。如果 Group 存在,StarRocks 会检查当前表是否满足 Colocation Group Schema。如果是,则创建该表并将其添加到 Group。同时,该表会根据现有 Group 的数据分布规则创建一个分区和一个 Tablet。

Colocation Group 属于数据库。Colocation Group 的名称在数据库中是唯一的。在内部存储中,Colocation Group 的全名是 dbId_groupName,但您只感知到 groupName

注意

如果您指定相同的 Colocation Group 来关联来自不同数据库的表以进行 Colocate Join,则 Colocation Group 存在于每个数据库中。您可以运行 show proc "/colocation_group" 来检查不同数据库中的 Colocation Group。

删除

完整删除是从回收站中删除。通常,在使用 DROP TABLE 命令删除表后,默认情况下它会在回收站中保留一天,然后才会被删除。当 Group 中的最后一个表被完全删除时,Group 也会自动删除。

查看 Group 信息

以下命令允许您查看集群中已存在的 Group 信息。

SHOW PROC '/colocation_group';

+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |
+-------------+--------------+--------------+------------+----------------+----------+----------+
  • GroupId:Group 在集群范围内的唯一标识符,前半部分是 db id,后半部分是 Group id。
  • GroupName:Group 的全名。
  • TabletIds:Group 中表的 id 列表。
  • BucketsNum:Bucket 的数量。
  • ReplicationNum:副本的数量。
  • DistCols:Distribution 列,即 Bucketing 列类型。
  • IsStable:Group 是否稳定(有关稳定性的定义,请参见 Colocation 副本平衡和修复部分)。

您可以使用以下命令进一步查看 Group 的数据分布。

SHOW PROC '/colocation_group/10005.10008';

+-------------+---------------------+
| BucketIndex | BackendIds |
+-------------+---------------------+
| 0 | 10004, 10002, 10001 |
| 1 | 10003, 10002, 10004 |
| 2 | 10002, 10004, 10001 |
| 3 | 10003, 10002, 10004 |
| 4 | 10002, 10004, 10003 |
| 5 | 10003, 10002, 10001 |
| 6 | 10003, 10004, 10001 |
| 7 | 10003, 10004, 10002 |
+-------------+---------------------+
  • BucketIndex:Bucket 序列的下标。
  • BackendIds:Bucket 数据片所在的 BE 节点的 id。

注意:以上命令需要 NODE 权限或 cluster_admin 角色。普通用户无法访问它。

修改表 Group 属性

您可以修改表的 Colocation Group 属性。例如

ALTER TABLE tbl SET ("colocate_with" = "group2");

如果该表之前未分配给 Group,则该命令将检查 Schema 并将该表添加到 Group(如果 Group 不存在,则会先创建该 Group)。如果该表之前已分配给另一个 Group,则该命令会将该表从原始 Group 中删除并将其添加到新 Group(如果 Group 不存在,则会先创建该 Group)。

您还可以使用以下命令删除表的 Colocation 属性。

ALTER TABLE tbl SET ("colocate_with" = "");

使用 ADD PARTITION 添加分区或将副本数量修改为具有 Colocation 属性的表时,StarRocks 会检查该操作是否违反 Colocation Group Schema,如果违反,则拒绝该操作。

Colocation 副本平衡和修复

Colocation 表的副本分布需要遵循 Group Schema 中指定的分布规则,因此在副本修复和平衡方面与普通分片不同。

Group 本身具有一个 stable 属性。当 stabletrue 时,表示 Group 中没有对表切片进行更改,并且 Colocation 功能正常工作。当 stablefalse 时,表示当前 Group 中的某些表切片正在修复或迁移,并且受影响表的 Colocate Join 将降级为普通 Join。

副本修复

副本只能存储在指定的 BE 节点上。StarRocks 将查找负载最少的 BE 来替换不可用的 BE(例如,down、decommission)。替换后,将修复旧 BE 上的所有 Bucketing 数据切片。在迁移过程中,Group 将被标记为Unstable

副本平衡

StarRocks 尝试将 Colocation 表切片均匀地分布在所有 BE 节点上。普通表的平衡是在副本级别进行的,也就是说,每个副本单独查找负载较低的 BE 节点。Colocation 表的平衡是在 Bucket 级别进行的,也就是说,一个 Bucket 中的所有副本一起迁移。我们使用一种简单的平衡算法,将 BucketsSequnce 均匀地分布在所有 BE 节点上,而不考虑副本的实际大小,而只考虑副本的数量。确切的算法可以在 ColocateTableBalancer.java 中的代码注释中找到。

注意 1:当前的 Colocation 副本平衡和修复算法可能不适用于具有异构部署的 StarRocks 集群。所谓的异构部署是指 BE 节点的磁盘容量、磁盘数量和磁盘类型(SSD 和 HDD)不一致。在异构部署的情况下,可能会发生小容量 BE 节点存储的副本数量与大容量 BE 节点相同的情况。

注意 2:当 Group 处于 Unstable 状态时,其表的 Join 将降级为普通 Join,这可能会显着降低集群的查询性能。如果您不希望系统自动平衡,请将 FE 配置 disable_colocate_balance 设置为禁用自动平衡,并在适当的时候再次启用它。(有关详细信息,请参见高级操作(#Advanced Operations)部分)

查询

Colocation 表的查询方式与普通表相同。如果 Colocation 表所在的 Group 处于 Unstable 状态,它将自动降级为普通 Join,如以下示例所示。

表 1

CREATE TABLE `tbl1` (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
PARTITION p1 VALUES LESS THAN ('2019-05-31'),
PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 6
PROPERTIES (
"colocate_with" = "group1"
);
INSERT INTO tbl1
VALUES
("2015-09-12",1000,1),
("2015-09-13",2000,2);

表 2

CREATE TABLE `tbl2` (
`k1` datetime NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 6
PROPERTIES (
"colocate_with" = "group1"
);
INSERT INTO tbl2
VALUES
("2015-09-12 00:00:00",3000,3),
("2015-09-12 00:00:00",4000,4);

查看查询计划

EXPLAIN SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
+-------------------------------------------------------------------------+
| Explain String |
+-------------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:1: k1 | 2: k2 | 3: v1 | 4: k1 | 5: k2 | 6: v1 |
| PARTITION: UNPARTITIONED |
| |
| RESULT SINK |
| |
| 3:EXCHANGE |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (COLOCATE) |
| | colocate: true |
| | equal join conjunct: 5: k2 = 2: k2 |
| | |
| |----1:OlapScanNode |
| | TABLE: tbl1 |
| | PREAGGREGATION: OFF. Reason: Has can not pre-aggregation Join |
| | partitions=1/2 |
| | rollup: tbl1 |
| | tabletRatio=6/6 |
| | tabletList=15344,15346,15348,15350,15352,15354 |
| | cardinality=1 |
| | avgRowSize=3.0 |
| | |
| 0:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: None aggregate function |
| partitions=1/1 |
| rollup: tbl2 |
| tabletRatio=6/6 |
| tabletList=15373,15375,15377,15379,15381,15383 |
| cardinality=1 |
| avgRowSize=3.0 |
+-------------------------------------------------------------------------+
40 rows in set (0.03 sec)

如果 Colocate Join 生效,则 Hash Join 节点将显示 colocate: true

如果不生效,则查询计划如下

+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | colocate: false, reason: group is not stable |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----3:EXCHANGE |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 1:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: null |
| partitions=0/1 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 1 |
+----------------------------------------------------+

HASH JOIN 节点将显示相应的原因:colocate: false, reason: group is not stable。同时会生成一个 EXCHANGE 节点。

高级操作

FE 配置项

  • disable_colocate_relocate

是否禁用 StarRocks 的自动 Colocation 副本修复。默认值为 false,表示已启用。此参数仅影响 Colocation 表的副本修复,而不影响普通表的副本修复。

  • disable_colocate_balance

是否禁用 StarRocks 的自动 Colocation 副本平衡。默认值为 false,表示已启用。此参数仅影响 Colocation 表的副本平衡,而不影响普通表的副本平衡。

  • disable_colocate_join

    您可以通过更改此变量来禁用会话粒度的 Colocate Join。

  • disable_colocate_join

    可以通过更改此变量来禁用 Colocate Join 功能。

HTTP Restful API

StarRocks 提供了几个与 Colocate Join 相关的 HTTP Restful API,用于查看和修改 Colocation Group。

此 API 在 FE 上实现,可以使用 fe_host:fe_http_portdb_admin 以及 user_admin 权限进行访问。

  1. 查看集群的所有 Colocation 信息

    curl --location-trusted -u<username>:<password> 'http://<fe_host>:<fe_http_port>/api/colocate'  
    // Returns the internal Colocation information in Json format.
    {
    "colocate_meta": {
    "groupName2Id": {
    "g1": {
    "dbId": 10005,
    "grpId": 10008
    }
    },
    "group2Tables": {},
    "table2Group": {
    "10007": {
    "dbId": 10005,
    "grpId": 10008
    },
    "10040": {
    "dbId": 10005,
    "grpId": 10008
    }
    },
    "group2Schema": {
    "10005.10008": {
    "groupId": {
    "dbId": 10005,
    "grpId": 10008
    },
    "distributionColTypes": [{
    "type": "INT",
    "len": -1,
    "isAssignedStrLenInColDefinition": false,
    "precision": 0,
    "scale": 0
    }],
    "bucketsNum": 10,
    "replicationNum": 2
    }
    },
    "group2BackendsPerBucketSeq": {
    "10005.10008": [
    [10004, 10002],
    [10003, 10002],
    [10002, 10004],
    [10003, 10002],
    [10002, 10004],
    [10003, 10002],
    [10003, 10004],
    [10003, 10004],
    [10003, 10004],
    [10002, 10004]
    ]
    },
    "unstableGroups": []
    },
    "status": "OK"
    }
  2. 将 Group 标记为 Stable 或 Unstable

    # Mark as Stable
    curl -XPOST --location-trusted -u<username>:<password>'http://<fe_host>:<fe_http_port>/api/colocate/group_stable?db_id=<dbId>&group_id=<grpId>​'
    # Mark as Unstable
    curl -XPOST --location-trusted -u<username>:<password>'http://<fe_host>:<fe_http_port>/api/colocate/group_unstable?db_id=<dbId>&group_id=<grpId>​'

    如果返回的结果是 200,则表示 Group 已成功标记为 Stable 或 Unstable。

  3. 设置 Group 的数据分布

    此接口允许您强制 Group 的编号分布。

    POST /api/colocate/bucketseq?db_id=10005&group_id= 10008

    Body

    [[10004,10002],[10003,10002],[10002,10004],[10003,10002],[10002,10004],[10003,10002],[10003,10004],[10003,10004],[10003,10004],[10002,10004]]

    返回:200

    其中 Body 是表示为嵌套数组的 BucketsSequence 以及 Bucketing 切片所在的 BE 的 id。

    请注意,要使用此命令,您可能需要将 FE 配置 disable_colocate_relocatedisable_colocate_balance 设置为 true,也就是说,禁用系统执行自动 Colocation 副本修复和平衡。否则,它可能会在修改后被系统自动重置。