跳到主要内容
版本: 最新版本-3.5

StarRocks 迁移工具 (SMT)

StarRocks 迁移工具 (SMT) 是 StarRocks 提供的数据迁移工具,用于通过 Flink 将数据从源数据库加载到 StarRocks 中。SMT 主要可以

  • 基于源数据库和目标 StarRocks 集群的信息,生成在 StarRocks 中创建表的语句。
  • 生成可以在 Flink SQL 客户端中执行的 SQL 语句,以提交 Flink 作业来同步数据,从而简化管道中的全量或增量数据同步。目前,SMT 支持以下源数据库
源数据库生成在 StarRocks 中创建表的语句全量数据同步增量数据同步
MySQL支持支持支持
PostgreSQL支持支持支持
Oracle支持支持支持
Hive支持支持不支持
ClickHouse支持支持不支持
SQL Server支持支持支持
TiDB支持支持支持

下载链接:https://cdn-thirdparty.starrocks.com/smt.tar.gz?r=2

SMT 使用步骤

通常涉及的步骤如下

  1. 配置 conf/config_prod.conf 文件。

  2. 执行 starrocks-migration-tool

  3. 执行后,SQL 脚本默认在 result 目录中生成。

    然后,您可以使用 result 目录中的 SQL 脚本进行元数据或数据同步。

SMT 配置

  • [db]:连接数据源的信息。配置与 type 参数中指定的数据库类型对应的数据源连接信息。

  • [other]:其他配置。建议在 be_num 参数中指定 BE 节点的实际数量。

  • flink.starrocks.sink.*:flink-connector-starrocks 的配置。有关详细配置和说明,请参见配置说明

  • [table-rule.1]:用于匹配数据源中表的规则。CREATE TABLE 语句基于规则中配置的正则表达式生成,以匹配数据源中数据库和表的名称。可以配置多个规则,每个规则生成一个对应的结果文件,例如

    • [table-rule.1] -> result/starrocks-create.1.sql
    • [table-rule.2] -> result/starrocks-create.2.sql

    每个规则都需要包含数据库、表和 flink-connector-starrocks 的配置。

    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^ database1.*$
    # pattern to match tables for setting properties
    table = ^.*$
    schema = ^.*$
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true

    [table-rule.2]
    # pattern to match databases for setting properties
    database = ^database2.*$
    # pattern to match tables for setting properties
    table = ^.*$
    schema = ^.*$
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  • 可以为拆分到数据库中的分片大表配置单独的规则。例如,假设两个数据库 edu_db_1edu_db_2 分别包含表 course_1course_2,并且这两个表具有相同的结构。您可以使用以下规则将这两个表中的数据加载到一个 StarRocks 表中进行分析。

    [table-rule.3]
    # pattern to match databases for setting properties
    database = ^edu_db_[0-9]*$
    # pattern to match tables for setting properties
    table = ^course_[0-9]*$
    schema = ^.*$

    此规则将自动形成多对一的加载关系。将在 StarRocks 中生成的表的默认名称是 course__auto_shard,您也可以在相关的 SOL 脚本中修改表名,例如 result/starrocks-create.3.sql

将 MySQL 同步到 StarRocks

简介

Flink CDC connector 和 SMT 可以在亚秒级内同步来自 MySQL 的数据。

img

如图所示,SMT 可以基于 MySQL 和 StarRocks 的集群信息和表结构,自动生成 Flink 源表和 Sink 表的 CREATE TABLE 语句。Flink CDC connector 读取 MySQL Binlog,Flink-connector-starrocks 将数据写入 StarRocks。

步骤

  1. 下载Flink。支持的 Flink 版本为 1.11 或更高版本。

  2. 下载Flink CDC connector。确保下载与 Flink 版本对应的 flink-sql-connector-mysql-cdc-xxx.jar

  3. 下载Flink-connector-starrocks

  4. flink-sql-connector-mysql-cdc-xxx.jarflink-connector-starrocks-xxx.jar 复制到 flink-xxx/lib/

  5. 下载smt.tar.gz

  6. 解压并修改 SMT 的配置文件。

    [db]
    host = 192.168.1.1
    port = 3306
    user = root
    password =
    type = mysql

    [other]
    # number of backends in StarRocks
    be_num = 3
    # `decimal_v3` is supported since StarRocks-1.18.1
    use_decimal_v3 = false
    # directory to save the converted DDL SQL
    output_dir = ./result

    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^db$
    # pattern to match tables for setting properties
    table = ^table$
    schema = ^.*$

    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. 执行 starrocks-migrate-tool。所有 SQL 脚本都在 result 目录中生成。

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql
  8. 使用前缀为 starrocks-create 的 SQL 脚本在 StarRocks 中生成表。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. 使用前缀为 flink-create 的 SQL 脚本生成 Flink 源表和 Sink 表,并启动 Flink 作业以同步数据。

    bin/sql-client.sh embedded < flink-create.all.sql

    上述命令成功执行后,用于同步数据的 Flink 作业将保持运行。

  10. 观察 Flink 作业的状态。

    bin/flink list 

    如果作业执行遇到错误,您可以在 Flink 日志中查看详细的错误信息。此外,您可以在文件 conf/flink-conf.yaml 中修改 Flink 配置,例如内存和 slot。

注意事项

  • 如何启用 MySQL binlog?

    1. 修改 /etc/my.cnf

      # Enable binlog
      log-bin=/var/lib/mysql/mysql-bin

      #log_bin=ON
      ## Base name of binlog files
      #log_bin_basename=/var/lib/mysql/mysql-bin
      ## Index file for binlog files, managing all binlog files
      #log_bin_index=/var/lib/mysql/mysql-bin.index
      # Configure server id
      server-id=1
      binlog_format = row
    2. 重启 mysqld。您可以通过执行 SHOW VARIABLES LIKE 'log_bin'; 来检查是否已启用 MySQL binlog。

将 PostgreSQL 同步到 StarRocks

简介

Flink CDC connector 和 SMT 可以在亚秒级内同步来自 PostgreSQL 的数据。

SMT 可以基于 PostgreSQL 和 StarRocks 的集群信息和表结构,自动生成 Flink 源表和 Sink 表的 CREATE TABLE 语句。

Flink CDC connector 读取 PostgreSQL 的 WAL,Flink-connector-starrocks 将数据写入 StarRocks。

步骤

  1. 下载Flink。支持的 Flink 版本为 1.11 或更高版本。

  2. 下载Flink CDC connector。确保下载与 Flink 版本对应的 flink-sql-connector-postgres-cdc-xxx.jar。

  3. 下载Flink StarRocks connector

  4. flink-sql-connector-postgres-cdc-xxx.jarflink-connector-starrocks-xxx.jar 复制到 flink-xxx/lib/

  5. 下载smt.tar.gz

  6. 解压并修改 SMT 的配置文件。

    [db]
    host = 192.168.1.1
    port = 5432
    user = xxx
    password = xxx
    type = pgsql

    [other]
    # number of backends in StarRocks
    be_num = 3
    # `decimal_v3` is supported since StarRocks-1.18.1
    use_decimal_v3 = false
    # directory to save the converted DDL SQL
    output_dir = ./result

    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^db$
    # pattern to match tables for setting properties
    table = ^table$
    # pattern to match schemas for setting properties
    schema = ^.*$

    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. 执行 starrocks-migrate-tool。所有 SQL 脚本都在 result 目录中生成。

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql
  8. 使用前缀为 starrocks-create 的 SQL 脚本在 StarRocks 中生成表。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. 使用前缀为 flink-create 的 SQL 脚本生成 Flink 源表和 Sink 表,并启动 Flink 作业以同步数据。

    bin/sql-client.sh embedded < flink-create.all.sql

    上述命令成功执行后,用于同步数据的 Flink 作业将保持运行。

  10. 观察 Flink 作业的状态。

    bin/flink list 

    如果作业执行遇到错误,您可以在 Flink 日志中查看详细的错误信息。此外,您可以在文件 conf/flink-conf.yaml 中修改 Flink 配置,例如内存和 slot。

注意事项

  • 对于 PostgreSQL v9.*,需要特殊的 flink-cdc 配置,如下所示(建议使用 PostgreSQL v10.* 或更高版本。否则,您需要安装 WAL 解码插件)

    ############################################
    ############################################
    ### flink-cdc plugin configuration for `postgresql`
    ############################################
    ### for `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming
    ### refer to https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
    ### and https://debezium.io/documentation/reference/postgres-plugins.html
    ### flink.cdc.decoding.plugin.name = decoderbufs
  • 如何启用 PostgreSQL WAL?

    # Open connection permissions
    echo "host all all 0.0.0.0/32 trust" >> pg_hba.conf
    echo "host replication all 0.0.0.0/32 trust" >> pg_hba.conf
    # Enable wal logical replication
    echo "wal_level = logical" >> postgresql.conf
    echo "max_wal_senders = 2" >> postgresql.conf
    echo "max_replication_slots = 8" >> postgresql.conf

    为需要同步的表指定 replica identity FULL。

    ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL

    进行这些更改后,重启 PostgreSQL。

将 Oracle 同步到 StarRocks

简介

Flink CDC connector 和 SMT 可以在亚秒级内同步来自 Oracle 的数据。

SMT 可以基于 Oracle 和 StarRocks 的集群信息和表结构,自动生成 Flink 源表和 Sink 表的 CREATE TABLE 语句。

Flink CDC connector 读取 Oracle 的 logminer,Flink-connector-starrocks 将数据写入 StarRocks。

步骤

  1. 下载Flink。支持的 Flink 版本为 1.11 或更高版本。

  2. 下载Flink CDC connector。确保下载与 Flink 版本对应的 flink-sql-connector-oracle-cdc-xxx.jar。

  3. 下载Flink StarRocks connector

  4. flink-sql-connector-oracle-cdc-xxx.jarflink-connector-starrocks-xxx.jar 复制到 flink-xxx/lib/

  5. 下载smt.tar.gz

  6. 解压并修改 SMT 的配置文件。

    [db]
    host = 192.168.1.1
    port = 1521
    user = xxx
    password = xxx
    type = oracle

    [other]
    # number of backends in StarRocks
    be_num = 3
    # `decimal_v3` is supported since StarRocks-1.18.1
    use_decimal_v3 = false
    # directory to save the converted DDL SQL
    output_dir = ./result

    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^db$
    # pattern to match tables for setting properties
    table = ^table$
    # pattern to match schemas for setting properties
    schema = ^.*$

    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. 执行 starrocks-migrate-tool。所有 SQL 脚本都在 result 目录中生成。

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql
  8. 使用前缀为 starrocks-create 的 SQL 脚本在 StarRocks 中生成表。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. 使用前缀为 flink-create 的 SQL 脚本生成 Flink 源表和 Sink 表,并启动 Flink 作业以同步数据。

    bin/sql-client.sh embedded < flink-create.all.sql

    上述命令成功执行后,用于同步数据的 Flink 作业将保持运行。

  10. 观察 Flink 作业的状态。

    bin/flink list 

    如果作业执行遇到错误,您可以在 Flink 日志中查看详细的错误信息。此外,您可以在文件 conf/flink-conf.yaml 中修改 Flink 配置,例如内存和 slot。

注意事项

  • 使用 logminer 同步 Oracle

    # Enable logging
    alter system set db_recovery_file_dest = '/home/oracle/data' scope=spfile;
    alter system set db_recovery_file_dest_size = 10G;
    shutdown immediate;
    startup mount;
    alter database archivelog;
    alter database open;

    ALTER TABLE schema_name.table_name ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

    # Authorize user creation and grant permissions
    GRANT CREATE SESSION TO flinkuser;
    GRANT SET CONTAINER TO flinkuser;
    GRANT SELECT ON V_$DATABASE TO flinkuser;
    GRANT FLASHBACK ANY TABLE TO flinkuser;
    GRANT SELECT ANY TABLE TO flinkuser;
    GRANT SELECT_CATALOG_ROLE TO flinkuser;
    GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
    GRANT SELECT ANY TRANSACTION TO flinkuser;
    GRANT LOGMINING TO flinkuser;
    GRANT CREATE TABLE TO flinkuser;
    GRANT LOCK ANY TABLE TO flinkuser;
    GRANT ALTER ANY TABLE TO flinkuser;
    GRANT CREATE SEQUENCE TO flinkuser;
    GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
    GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
    GRANT SELECT ON V_$LOG TO flinkuser;
    GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
    GRANT SELECT ON V_$LOGFILE TO flinkuser;
    GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
    GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
  • [table-rule.1] 中的数据库配置不支持正则表达式,因此需要指定完整的数据库名称。

  • 由于 Oracle12c 支持 CDB 模式,SMT 在内部自动确定是否启用了 CDB,并相应地修改 flink-cdc 配置。但是,用户需要注意是否需要在 [db].user 的配置中添加 c## 前缀,以避免权限不足的问题。

将 Hive 同步到 StarRocks

简介

本指南介绍了如何使用 SMT 将 Hive 数据同步到 StarRocks。在同步期间,会在 StarRocks 中创建一个 Duplicate 表,并且 Flink 作业会持续运行以同步数据。

步骤

准备工作

[db]
# hiveserver2 service ip
host = 127.0.0.1
# hiveserver2 service port
port = 10000
user = hive/emr-header-1.cluster-49148
password =
type = hive
# only takes effect with `type = hive`.
# Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
authentication = kerberos

支持的身份验证方法如下

  • nosasl, zk:不需要指定 userpassword
  • none, none_http, ldap:指定 userpassword
  • kerberos, kerberos_http:执行以下步骤
    • 在 Hive 集群上执行 kadmin.local 并检查 list_principals 以找到对应的 principal 名称。例如,当 principal 名称为 hive/emr-header-1.cluster-49148@EMR.49148.COM 时,需要将 user 设置为 hive/emr-header-1.cluster-49148,密码留空。
    • 在执行 SMT 的机器上执行 kinit -kt /path/to/keytab principal 并执行 klist 以查看是否生成了正确的令牌。

数据同步

  1. 执行 starrocks-migrate-tool

  2. 使用前缀为 starrocks-create 的 SQL 脚本在 StarRocks 中生成表。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  3. flink/conf/ 中,创建并编辑文件 sql-client-defaults.yaml

    execution:
    planner: blink
    type: batch
    current-catalog: hive-starrocks
    catalogs:
    - name: hive-starrocks
    type: hive
    hive-conf-dir: /path/to/apache-hive-xxxx-bin/conf
  4. 从对应 Flink 版本的 Hive 页面下载依赖包 (flink-sql-connector-hive-xxxx) 并将其放置在 flink/lib 目录中。

  5. 启动 Flink 集群并执行 flink/bin/sql-client.sh embedded < result/flink-create.all.sql 以启动数据同步。

将 SQL Server 同步到 StarRocks

简介

Flink CDC connector 和 SMT 可以在亚秒级内同步来自 SQL Server 的数据。

SMT 可以基于 SQL Server 和 StarRocks 的集群信息和表结构,自动生成 Flink 源表和 Sink 表的 CREATE TABLE 语句。

Flink CDC connector 捕获并记录 SQL Server 数据库服务器中发生的行级更改。原理是使用 SQL Server 本身提供的 CDC 功能。SQL Server 本身的 CDC 功能可以将数据库中的指定更改归档到指定的更改表中。SQL Server CDC connector 首先使用 JDBC 从表中读取历史数据,然后从更改表中获取增量更改,从而实现全量增量同步。然后,Flink-connector-starrocks 将数据写入 StarRocks。

步骤

  1. 下载Flink。支持的 Flink 版本为 1.11 或更高版本。

  2. 下载Flink CDC connector。确保下载与 Flink 版本对应的 flink-sql-connector-sqlserver-cdc-xxx.jar

  3. 下载Flink StarRocks connector

  4. flink-sql-connector-sqlserver-cdc-xxx.jarflink-connector-starrocks-xxx.jar 复制到 flink-xxx/lib/

  5. 下载smt.tar.gz

  6. 解压并修改 SMT 的配置文件。

    [db]
    host = 127.0.0.1
    port = 1433
    user = xxx
    password = xxx

    # currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`
    type = sqlserver

    [other]
    # number of backends in StarRocks
    be_num = 3
    # `decimal_v3` is supported since StarRocks-1.18.1
    use_decimal_v3 = false
    # directory to save the converted DDL SQL
    output_dir = ./result

    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^db$
    # pattern to match tables for setting properties
    table = ^table$
    schema = ^.*$

    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. 执行 starrocks-migrate-tool*。所有 SQL 脚本都在 result 目录中生成。

​ ```Bash $./starrocks-migrate-tool $ls result flink-create.1.sql smt.tar.gz starrocks-create.all.sql flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql


8. Use a SQL script whose prefix is `starrocks-create` to generate the table in StarRocks.

```Bash
mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  1. 使用前缀为 flink-create 的 SQL 脚本生成 Flink 源表和 Sink 表,并启动 Flink 作业以同步数据。

    bin/sql-client.sh embedded < flink-create.all.sql     

    上述命令成功执行后,用于同步数据的 Flink 作业将保持运行。

  2. 观察 Flink 作业的状态。

    bin/flink list 

    如果作业执行遇到错误,您可以在 Flink 日志中查看详细的错误信息。此外,您可以在文件 conf/flink-conf.yaml 中修改 Flink 配置,例如内存和 slot。

注意事项

  1. 确保已启用 Server Agent Service。

    检查 Server Agent Service 是否正常运行。

    EXEC master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'
    GO

    启用 Server Agent Service。

    /opt/mssql/bin/mssql-conf set sqlagent.enabled true
  2. 确保已启用相应数据库的 CDC。

    ​ 检查是否已启用相应数据库的 CDC。

    select is_cdc_enabled, name from sys.databases where name = 'XXX_databases'
    GO

    ​ 启用 CDC。

    ​ :::note

    ​ 执行此命令时,请确保用户 serverRolesysadmin

    ​ ::

    USE XXX_databases
    GO
    EXEC sys.sp_cdc_enable_db
    GO
  3. 确保已启用相应表的 CDC。

    EXEC sys.sp_cdc_enable_table 
    @source_schema = 'XXX_schema',
    @source_name = 'XXX_table',
    @role_name = NULL,
    @supports_net_changes = 0;
    GO

将 TiDB 同步到 StarRocks

简介

Flink CDC connector 和 SMT 可以在亚秒级内同步来自 TiDB 的数据。

SMT 可以基于 TiDB 和 StarRocks 的集群信息和表结构,自动生成 Flink 源表和 Sink 表的 DDL 语句。

Flink CDC connector 通过直接从底层 TiKV 存储读取全量和增量数据来捕获数据。全量数据是从基于键分区的 ranges 中获取的,增量数据是通过使用 TiDB 提供的 CDC Client 获取的。随后,数据通过 Flink-connector-starrocks 写入 StarRocks。

步骤

  1. 下载Flink。支持的 Flink 版本为 1.11 或更高版本。

  2. 下载Flink CDC connector。确保下载与 Flink 版本对应的 flink-sql-connector-tidb-cdc-xxx.jar

  3. 下载Flink StarRocks connector

  4. flink-sql-connector-tidb-cdc-xxx.jarflink-connector-starrocks-xxx.jar 复制到 flink-xxx/lib/

  5. 下载smt.tar.gz

  6. 解压并修改 SMT 的配置文件。

    [db]
    host = 127.0.0.1
    port = 4000
    user = root
    password =
    # currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`, `sqlserver`, `tidb`
    type = tidb
    # # only takes effect on `type == hive`.
    # # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
    # authentication = kerberos

    [other]
    # number of backends in StarRocks
    be_num = 3
    # `decimal_v3` is supported since StarRocks-1.18.1
    use_decimal_v3 = false
    # directory to save the converted DDL SQL
    output_dir = ./result

    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^db$
    # pattern to match tables for setting properties
    table = ^table$
    schema = ^.*$

    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url= 192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true

    ############################################
    ### flink-cdc configuration for `tidb`
    ############################################
    # # Only takes effect on TiDB before v4.0.0.
    # # TiKV cluster's PD address.
    # flink.cdc.pd-addresses = 127.0.0.1:2379
  7. 执行 starrocks-migrate-tool*。所有 SQL 脚本都在 result 目录中生成。

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql
  8. 使用前缀为 starrocks-create 的 SQL 脚本在 StarRocks 中生成表。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. 使用前缀为 flink-create 的 SQL 脚本生成 Flink 源表和 Sink 表,并启动 Flink 作业以同步数据。

    bin/sql-client.sh embedded < flink-create.all.sql     

​ 上述命令成功执行后,用于同步数据的 Flink 作业将保持运行。

  1. 观察 Flink 作业的状态。

    bin/flink list 

    如果作业执行遇到错误,您可以在 Flink 日志中查看详细的错误信息。此外,您可以在文件 conf/flink-conf.yaml 中修改 Flink 配置,例如内存和 slot。

注意事项

对于版本低于 v4.0.0 的 TiDB,需要额外配置 flink.cdc.pd-addresses

```Bash
############################################
### flink-cdc configuration for `tidb`
############################################
# # Only takes effect on TiDB before v4.0.0.
# # TiKV cluster's PD address.
# flink.cdc.pd-addresses = 127.0.0.1:2379
```