跳到主要内容
版本: 最新版本-3.5

实时同步 MySQL 数据

Flink 作业报错 Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing.

可能的原因是 SMT 配置文件 config_prod.conf 中的多个规则集(例如 [table-rule.1][table-rule.2])中缺少必需的配置信息。

您可以检查每个规则集(例如 [table-rule.1][table-rule.2])是否配置了必需的数据库、表和 Flink 连接器信息。

Flink 通过 检查点机制重启策略 自动重启失败的任务。

例如,如果您需要启用检查点机制并使用默认的重启策略(即固定延迟重启策略),您可以在配置文件 flink-conf.yaml 中配置以下信息

execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory

参数说明

注意

有关 Flink 文档中更详细的参数说明,请参阅 检查点

  • execution.checkpointing.interval:检查点的基本时间间隔。单位:毫秒。要启用检查点机制,您需要将此参数设置为大于 0 的值。
  • state.backend:指定状态后端,以确定状态在内部的表示方式,以及在检查点时如何以及在哪里持久化。常见值为 filesystemrocksdb。启用检查点机制后,状态会在检查点时持久化,以防止数据丢失并确保恢复后的数据一致性。有关状态的更多信息,请参阅 状态后端
  • state.checkpoints.dir:将检查点写入的目录。

您可以在停止 Flink 作业时手动触发一个 保存点 (savepoint),(保存点是流式 Flink 作业的执行状态的一致镜像,基于检查点机制创建)。 稍后,您可以从指定的保存点恢复 Flink 作业。

  1. 使用保存点停止 Flink 作业。以下命令会自动为 Flink 作业 jobId 触发保存点并停止 Flink 作业。此外,您可以指定一个目标文件系统目录来存储保存点。

    bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId

    参数说明

    • jobId:您可以从 Flink WebUI 或通过在命令行上运行 flink list -running 来查看 Flink 作业 ID。
    • targetDirectory:您可以将 state.savepoints.dir 指定为 Flink 配置文件 flink-conf.yml 中存储保存点的默认目录。触发保存点时,保存点将存储在此默认目录中,您无需指定目录。
    state.savepoints.dir: [file:// or hdfs://]/home/user/savepoints_dir
  2. 重新提交 Flink 作业,并指定前面的保存点。

    ./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-starrocks-xxxx.jar -f flink-create.all.sql