实时同步 MySQL 数据
如果 Flink 作业报错,我该怎么办?
Flink 作业报错 Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing.
可能的原因是 SMT 配置文件 config_prod.conf 中的多个规则集(例如 [table-rule.1]
和 [table-rule.2]
)中缺少必需的配置信息。
您可以检查每个规则集(例如 [table-rule.1]
和 [table-rule.2]
)是否配置了必需的数据库、表和 Flink 连接器信息。
如何让 Flink 自动重启失败的任务?
Flink 通过 检查点机制 和 重启策略 自动重启失败的任务。
例如,如果您需要启用检查点机制并使用默认的重启策略(即固定延迟重启策略),您可以在配置文件 flink-conf.yaml 中配置以下信息
execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
参数说明
注意
有关 Flink 文档中更详细的参数说明,请参阅 检查点。
execution.checkpointing.interval
:检查点的基本时间间隔。单位:毫秒。要启用检查点机制,您需要将此参数设置为大于0
的值。state.backend
:指定状态后端,以确定状态在内部的表示方式,以及在检查点时如何以及在哪里持久化。常见值为filesystem
或rocksdb
。启用检查点机制后,状态会在检查点时持久化,以防止数据丢失并确保恢复后的数据一致性。有关状态的更多信息,请参阅 状态后端。state.checkpoints.dir
:将检查点写入的目录。
如何手动停止 Flink 作业,并在以后将其恢复到停止之前的状态?
您可以在停止 Flink 作业时手动触发一个 保存点 (savepoint),(保存点是流式 Flink 作业的执行状态的一致镜像,基于检查点机制创建)。 稍后,您可以从指定的保存点恢复 Flink 作业。
-
使用保存点停止 Flink 作业。以下命令会自动为 Flink 作业
jobId
触发保存点并停止 Flink 作业。此外,您可以指定一个目标文件系统目录来存储保存点。bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId
参数说明
jobId
:您可以从 Flink WebUI 或通过在命令行上运行flink list -running
来查看 Flink 作业 ID。targetDirectory
:您可以将state.savepoints.dir
指定为 Flink 配置文件 flink-conf.yml 中存储保存点的默认目录。触发保存点时,保存点将存储在此默认目录中,您无需指定目录。
state.savepoints.dir: [file:// or hdfs://]/home/user/savepoints_dir
-
重新提交 Flink 作业,并指定前面的保存点。
./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-starrocks-xxxx.jar -f flink-create.all.sql