StarRocks 迁移工具 (SMT)
StarRocks 迁移工具 (SMT) 是 StarRocks 提供的数据迁移工具,用于通过 Flink 将数据从源数据库加载到 StarRocks 中。SMT 主要可以
- 基于源数据库和目标 StarRocks 集群的信息,生成在 StarRocks 中创建表的语句。
- 生成可以在 Flink SQL 客户端中执行的 SQL 语句,以提交 Flink 作业来同步数据,从而简化管道中的全量或增量数据同步。目前,SMT 支持以下源数据库
源数据库 | 生成在 StarRocks 中创建表的语句 | 全量数据同步 | 增量数据同步 |
---|---|---|---|
MySQL | 支持 | 支持 | 支持 |
PostgreSQL | 支持 | 支持 | 支持 |
Oracle | 支持 | 支持 | 支持 |
Hive | 支持 | 支持 | 不支持 |
ClickHouse | 支持 | 支持 | 不支持 |
SQL Server | 支持 | 支持 | 支持 |
TiDB | 支持 | 支持 | 支持 |
下载链接:https://cdn-thirdparty.starrocks.com/smt.tar.gz?r=2
SMT 使用步骤
通常涉及的步骤如下
-
配置 conf/config_prod.conf 文件。
-
执行 starrocks-migration-tool。
-
执行后,SQL 脚本默认在 result 目录中生成。
然后,您可以使用 result 目录中的 SQL 脚本进行元数据或数据同步。
SMT 配置
-
[db]
:连接数据源的信息。配置与type
参数中指定的数据库类型对应的数据源连接信息。 -
[other]
:其他配置。建议在be_num
参数中指定 BE 节点的实际数量。 -
flink.starrocks.sink.*
:flink-connector-starrocks 的配置。有关详细配置和说明,请参见配置说明。 -
[table-rule.1]
:用于匹配数据源中表的规则。CREATE TABLE 语句基于规则中配置的正则表达式生成,以匹配数据源中数据库和表的名称。可以配置多个规则,每个规则生成一个对应的结果文件,例如[table-rule.1]
->result/starrocks-create.1.sql
[table-rule.2]
->result/starrocks-create.2.sql
每个规则都需要包含数据库、表和 flink-connector-starrocks 的配置。
[table-rule.1]
# pattern to match databases for setting properties
database = ^ database1.*$
# pattern to match tables for setting properties
table = ^.*$
schema = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url= 192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true
[table-rule.2]
# pattern to match databases for setting properties
database = ^database2.*$
# pattern to match tables for setting properties
table = ^.*$
schema = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url= 192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true -
可以为拆分到数据库中的分片大表配置单独的规则。例如,假设两个数据库
edu_db_1
和edu_db_2
分别包含表course_1
和course_2
,并且这两个表具有相同的结构。您可以使用以下规则将这两个表中的数据加载到一个 StarRocks 表中进行分析。[table-rule.3]
# pattern to match databases for setting properties
database = ^edu_db_[0-9]*$
# pattern to match tables for setting properties
table = ^course_[0-9]*$
schema = ^.*$此规则将自动形成多对一的加载关系。将在 StarRocks 中生成的表的默认名称是
course__auto_shard
,您也可以在相关的 SOL 脚本中修改表名,例如result/starrocks-create.3.sql
。
将 MySQL 同步到 StarRocks
简介
Flink CDC connector 和 SMT 可以在亚秒级内同步来自 MySQL 的数据。
如图所示,SMT 可以基于 MySQL 和 StarRocks 的集群信息和表结构,自动生成 Flink 源表和 Sink 表的 CREATE TABLE 语句。Flink CDC connector 读取 MySQL Binlog,Flink-connector-starrocks 将数据写入 StarRocks。
步骤
-
下载Flink。支持的 Flink 版本为 1.11 或更高版本。
-
下载Flink CDC connector。确保下载与 Flink 版本对应的
flink-sql-connector-mysql-cdc-xxx.jar
。 -
将 flink-sql-connector-mysql-cdc-xxx.jar 和 flink-connector-starrocks-xxx.jar 复制到 flink-xxx/lib/。
-
下载smt.tar.gz。
-
解压并修改 SMT 的配置文件。
[db]
host = 192.168.1.1
port = 3306
user = root
password =
type = mysql
[other]
# number of backends in StarRocks
be_num = 3
# `decimal_v3` is supported since StarRocks-1.18.1
use_decimal_v3 = false
# directory to save the converted DDL SQL
output_dir = ./result
[table-rule.1]
# pattern to match databases for setting properties
database = ^db$
# pattern to match tables for setting properties
table = ^table$
schema = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url= 192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true -
执行 starrocks-migrate-tool。所有 SQL 脚本都在 result 目录中生成。
$./starrocks-migrate-tool
$ls result
flink-create.1.sql smt.tar.gz starrocks-create.all.sql
flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql -
使用前缀为 starrocks-create 的 SQL 脚本在 StarRocks 中生成表。
mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
-
使用前缀为 flink-create 的 SQL 脚本生成 Flink 源表和 Sink 表,并启动 Flink 作业以同步数据。
bin/sql-client.sh embedded < flink-create.all.sql
上述命令成功执行后,用于同步数据的 Flink 作业将保持运行。
-
观察 Flink 作业的状态。
bin/flink list
如果作业执行遇到错误,您可以在 Flink 日志中查看详细的错误信息。此外,您可以在文件 conf/flink-conf.yaml 中修改 Flink 配置,例如内存和 slot。
注意事项
-
如何启用 MySQL binlog?
-
修改 /etc/my.cnf
# Enable binlog
log-bin=/var/lib/mysql/mysql-bin
#log_bin=ON
## Base name of binlog files
#log_bin_basename=/var/lib/mysql/mysql-bin
## Index file for binlog files, managing all binlog files
#log_bin_index=/var/lib/mysql/mysql-bin.index
# Configure server id
server-id=1
binlog_format = row -
重启 mysqld。您可以通过执行
SHOW VARIABLES LIKE 'log_bin';
来检查是否已启用 MySQL binlog。
-
将 PostgreSQL 同步到 StarRocks
简介
Flink CDC connector 和 SMT 可以在亚秒级内同步来自 PostgreSQL 的数据。
SMT 可以基于 PostgreSQL 和 StarRocks 的集群信息和表结构,自动生成 Flink 源表和 Sink 表的 CREATE TABLE 语句。
Flink CDC connector 读取 PostgreSQL 的 WAL,Flink-connector-starrocks 将数据写入 StarRocks。
步骤
-
下载Flink。支持的 Flink 版本为 1.11 或更高版本。
-
下载Flink CDC connector。确保下载与 Flink 版本对应的 flink-sql-connector-postgres-cdc-xxx.jar。
-
将 flink-sql-connector-postgres-cdc-xxx.jar 和 flink-connector-starrocks-xxx.jar 复制到 flink-xxx/lib/。
-
下载smt.tar.gz。
-
解压并修改 SMT 的配置文件。
[db]
host = 192.168.1.1
port = 5432
user = xxx
password = xxx
type = pgsql
[other]
# number of backends in StarRocks
be_num = 3
# `decimal_v3` is supported since StarRocks-1.18.1
use_decimal_v3 = false
# directory to save the converted DDL SQL
output_dir = ./result
[table-rule.1]
# pattern to match databases for setting properties
database = ^db$
# pattern to match tables for setting properties
table = ^table$
# pattern to match schemas for setting properties
schema = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url= 192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true -
执行 starrocks-migrate-tool。所有 SQL 脚本都在 result 目录中生成。
$./starrocks-migrate-tool
$ls result
flink-create.1.sql smt.tar.gz starrocks-create.all.sql
flink-create.all.sql starrocks-create.1.sql -
使用前缀为 starrocks-create 的 SQL 脚本在 StarRocks 中生成表。
mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
-
使用前缀为 flink-create 的 SQL 脚本生成 Flink 源表和 Sink 表,并启动 Flink 作业以同步数据。
bin/sql-client.sh embedded < flink-create.all.sql
上述命令成功执行后,用于同步数据的 Flink 作业将保持运行。
-
观察 Flink 作业的状态。
bin/flink list
如果作业执行遇到错误,您可以在 Flink 日志中查看详细的错误信息。此外,您可以在文件 conf/flink-conf.yaml 中修改 Flink 配置,例如内存和 slot。
注意事项
-
对于 PostgreSQL
v9.*
,需要特殊的 flink-cdc 配置,如下所示(建议使用 PostgreSQLv10.*
或更高版本。否则,您需要安装 WAL 解码插件)############################################
############################################
### flink-cdc plugin configuration for `postgresql`
############################################
### for `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming
### refer to https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
### and https://debezium.io/documentation/reference/postgres-plugins.html
### flink.cdc.decoding.plugin.name = decoderbufs -
如何启用 PostgreSQL WAL?
# Open connection permissions
echo "host all all 0.0.0.0/32 trust" >> pg_hba.conf
echo "host replication all 0.0.0.0/32 trust" >> pg_hba.conf
# Enable wal logical replication
echo "wal_level = logical" >> postgresql.conf
echo "max_wal_senders = 2" >> postgresql.conf
echo "max_replication_slots = 8" >> postgresql.conf为需要同步的表指定 replica identity FULL。
ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL
进行这些更改后,重启 PostgreSQL。
将 Oracle 同步到 StarRocks
简介
Flink CDC connector 和 SMT 可以在亚秒级内同步来自 Oracle 的数据。
SMT 可以基于 Oracle 和 StarRocks 的集群信息和表结构,自动生成 Flink 源表和 Sink 表的 CREATE TABLE 语句。
Flink CDC connector 读取 Oracle 的 logminer,Flink-connector-starrocks 将数据写入 StarRocks。
步骤
-
下载Flink。支持的 Flink 版本为 1.11 或更高版本。
-
下载Flink CDC connector。确保下载与 Flink 版本对应的 flink-sql-connector-oracle-cdc-xxx.jar。
-
将
flink-sql-connector-oracle-cdc-xxx.jar
和flink-connector-starrocks-xxx.jar
复制到flink-xxx/lib/
。 -
下载smt.tar.gz。
-
解压并修改 SMT 的配置文件。
[db]
host = 192.168.1.1
port = 1521
user = xxx
password = xxx
type = oracle
[other]
# number of backends in StarRocks
be_num = 3
# `decimal_v3` is supported since StarRocks-1.18.1
use_decimal_v3 = false
# directory to save the converted DDL SQL
output_dir = ./result
[table-rule.1]
# pattern to match databases for setting properties
database = ^db$
# pattern to match tables for setting properties
table = ^table$
# pattern to match schemas for setting properties
schema = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url= 192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true -
执行 starrocks-migrate-tool。所有 SQL 脚本都在 result 目录中生成。
$./starrocks-migrate-tool
$ls result
flink-create.1.sql smt.tar.gz starrocks-create.all.sql
flink-create.all.sql starrocks-create.1.sql -
使用前缀为 starrocks-create 的 SQL 脚本在 StarRocks 中生成表。
mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
-
使用前缀为 flink-create 的 SQL 脚本生成 Flink 源表和 Sink 表,并启动 Flink 作业以同步数据。
bin/sql-client.sh embedded < flink-create.all.sql
上述命令成功执行后,用于同步数据的 Flink 作业将保持运行。
-
观察 Flink 作业的状态。
bin/flink list
如果作业执行遇到错误,您可以在 Flink 日志中查看详细的错误信息。此外,您可以在文件 conf/flink-conf.yaml 中修改 Flink 配置,例如内存和 slot。
注意事项
-
使用 logminer 同步 Oracle
# Enable logging
alter system set db_recovery_file_dest = '/home/oracle/data' scope=spfile;
alter system set db_recovery_file_dest_size = 10G;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
ALTER TABLE schema_name.table_name ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
# Authorize user creation and grant permissions
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE TO flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser; -
[table-rule.1] 中的数据库配置不支持正则表达式,因此需要指定完整的数据库名称。
-
由于 Oracle12c 支持 CDB 模式,SMT 在内部自动确定是否启用了 CDB,并相应地修改 flink-cdc 配置。但是,用户需要注意是否需要在
[db].user
的配置中添加 c## 前缀,以避免权限不足的问题。
将 Hive 同步到 StarRocks
简介
本指南介绍了如何使用 SMT 将 Hive 数据同步到 StarRocks。在同步期间,会在 StarRocks 中创建一个 Duplicate 表,并且 Flink 作业会持续运行以同步数据。
步骤
准备工作
[db]
# hiveserver2 service ip
host = 127.0.0.1
# hiveserver2 service port
port = 10000
user = hive/emr-header-1.cluster-49148
password =
type = hive
# only takes effect with `type = hive`.
# Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
authentication = kerberos
支持的身份验证方法如下
- nosasl, zk:不需要指定
user
和password
。 - none, none_http, ldap:指定
user
和password
。 - kerberos, kerberos_http:执行以下步骤
- 在 Hive 集群上执行
kadmin.local
并检查list_principals
以找到对应的 principal 名称。例如,当 principal 名称为hive/emr-header-1.cluster-49148@EMR.49148.COM
时,需要将 user 设置为hive/emr-header-1.cluster-49148
,密码留空。 - 在执行 SMT 的机器上执行
kinit -kt /path/to/keytab principal
并执行klist
以查看是否生成了正确的令牌。
- 在 Hive 集群上执行
数据同步
-
执行 starrocks-migrate-tool。
-
使用前缀为 starrocks-create 的 SQL 脚本在 StarRocks 中生成表。
mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
-
在 flink/conf/ 中,创建并编辑文件 sql-client-defaults.yaml
execution:
planner: blink
type: batch
current-catalog: hive-starrocks
catalogs:
- name: hive-starrocks
type: hive
hive-conf-dir: /path/to/apache-hive-xxxx-bin/conf -
从对应 Flink 版本的 Hive 页面下载依赖包 (flink-sql-connector-hive-xxxx) 并将其放置在
flink/lib
目录中。 -
启动 Flink 集群并执行
flink/bin/sql-client.sh embedded < result/flink-create.all.sql
以启动数据同步。
将 SQL Server 同步到 StarRocks
简介
Flink CDC connector 和 SMT 可以在亚秒级内同步来自 SQL Server 的数据。
SMT 可以基于 SQL Server 和 StarRocks 的集群信息和表结构,自动生成 Flink 源表和 Sink 表的 CREATE TABLE 语句。
Flink CDC connector 捕获并记录 SQL Server 数据库服务器中发生的行级更改。原理是使用 SQL Server 本身提供的 CDC 功能。SQL Server 本身的 CDC 功能可以将数据库中的指定更改归档到指定的更改表中。SQL Server CDC connector 首先使用 JDBC 从表中读取历史数据,然后从更改表中获取增量更改,从而实现全量增量同步。然后,Flink-connector-starrocks 将数据写入 StarRocks。
步骤
-
下载Flink。支持的 Flink 版本为 1.11 或更高版本。
-
下载Flink CDC connector。确保下载与 Flink 版本对应的 flink-sql-connector-sqlserver-cdc-xxx.jar。
-
将 flink-sql-connector-sqlserver-cdc-xxx.jar、flink-connector-starrocks-xxx.jar 复制到 flink-xxx/lib/。
-
下载smt.tar.gz。
-
解压并修改 SMT 的配置文件。
[db]
host = 127.0.0.1
port = 1433
user = xxx
password = xxx
# currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`
type = sqlserver
[other]
# number of backends in StarRocks
be_num = 3
# `decimal_v3` is supported since StarRocks-1.18.1
use_decimal_v3 = false
# directory to save the converted DDL SQL
output_dir = ./result
[table-rule.1]
# pattern to match databases for setting properties
database = ^db$
# pattern to match tables for setting properties
table = ^table$
schema = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url= 192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true -
执行 starrocks-migrate-tool*。所有 SQL 脚本都在
result
目录中生成。
```Bash $./starrocks-migrate-tool $ls result flink-create.1.sql smt.tar.gz starrocks-create.all.sql flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql
8. Use a SQL script whose prefix is `starrocks-create` to generate the table in StarRocks.
```Bash
mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
-
使用前缀为
flink-create
的 SQL 脚本生成 Flink 源表和 Sink 表,并启动 Flink 作业以同步数据。bin/sql-client.sh embedded < flink-create.all.sql
上述命令成功执行后,用于同步数据的 Flink 作业将保持运行。
-
观察 Flink 作业的状态。
bin/flink list
如果作业执行遇到错误,您可以在 Flink 日志中查看详细的错误信息。此外,您可以在文件 conf/flink-conf.yaml 中修改 Flink 配置,例如内存和 slot。
注意事项
-
确保已启用 Server Agent Service。
检查 Server Agent Service 是否正常运行。
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'
GO启用 Server Agent Service。
/opt/mssql/bin/mssql-conf set sqlagent.enabled true
-
确保已启用相应数据库的 CDC。
检查是否已启用相应数据库的 CDC。
select is_cdc_enabled, name from sys.databases where name = 'XXX_databases'
GO 启用 CDC。
:::note
执行此命令时,请确保用户
serverRole
为sysadmin
。 ::
USE XXX_databases
GO
EXEC sys.sp_cdc_enable_db
GO -
确保已启用相应表的 CDC。
EXEC sys.sp_cdc_enable_table
@source_schema = 'XXX_schema',
@source_name = 'XXX_table',
@role_name = NULL,
@supports_net_changes = 0;
GO
将 TiDB 同步到 StarRocks
简介
Flink CDC connector 和 SMT 可以在亚秒级内同步来自 TiDB 的数据。
SMT 可以基于 TiDB 和 StarRocks 的集群信息和表结构,自动生成 Flink 源表和 Sink 表的 DDL 语句。
Flink CDC connector 通过直接从底层 TiKV 存储读取全量和增量数据来捕获数据。全量数据是从基于键分区的 ranges 中获取的,增量数据是通过使用 TiDB 提供的 CDC Client 获取的。随后,数据通过 Flink-connector-starrocks 写入 StarRocks。
步骤
-
下载Flink。支持的 Flink 版本为 1.11 或更高版本。
-
下载Flink CDC connector。确保下载与 Flink 版本对应的 flink-sql-connector-tidb-cdc-xxx.jar。
-
将 flink-sql-connector-tidb-cdc-xxx.jar、flink-connector-starrocks-xxx.jar 复制到 flink-xxx/lib/。
-
下载smt.tar.gz。
-
解压并修改 SMT 的配置文件。
[db]
host = 127.0.0.1
port = 4000
user = root
password =
# currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`, `sqlserver`, `tidb`
type = tidb
# # only takes effect on `type == hive`.
# # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
# authentication = kerberos
[other]
# number of backends in StarRocks
be_num = 3
# `decimal_v3` is supported since StarRocks-1.18.1
use_decimal_v3 = false
# directory to save the converted DDL SQL
output_dir = ./result
[table-rule.1]
# pattern to match databases for setting properties
database = ^db$
# pattern to match tables for setting properties
table = ^table$
schema = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url= 192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true
############################################
### flink-cdc configuration for `tidb`
############################################
# # Only takes effect on TiDB before v4.0.0.
# # TiKV cluster's PD address.
# flink.cdc.pd-addresses = 127.0.0.1:2379 -
执行 starrocks-migrate-tool*。所有 SQL 脚本都在
result
目录中生成。$./starrocks-migrate-tool
$ls result
flink-create.1.sql smt.tar.gz starrocks-create.all.sql
flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql -
使用前缀为
starrocks-create
的 SQL 脚本在 StarRocks 中生成表。mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
-
使用前缀为
flink-create
的 SQL 脚本生成 Flink 源表和 Sink 表,并启动 Flink 作业以同步数据。bin/sql-client.sh embedded < flink-create.all.sql
上述命令成功执行后,用于同步数据的 Flink 作业将保持运行。
-
观察 Flink 作业的状态。
bin/flink list
如果作业执行遇到错误,您可以在 Flink 日志中查看详细的错误信息。此外,您可以在文件 conf/flink-conf.yaml 中修改 Flink 配置,例如内存和 slot。
注意事项
对于版本低于 v4.0.0 的 TiDB,需要额外配置 flink.cdc.pd-addresses
。
```Bash
############################################
### flink-cdc configuration for `tidb`
############################################
# # Only takes effect on TiDB before v4.0.0.
# # TiKV cluster's PD address.
# flink.cdc.pd-addresses = 127.0.0.1:2379
```