diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java index bd2e29f026d..3bd6c151015 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.core.starter.flink; +import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils; + import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.constants.EngineType; import org.apache.seatunnel.core.starter.Starter; @@ -56,6 +58,11 @@ public List buildCommands() { command.add("${FLINK_HOME}/bin/flink"); // set deploy mode, run or run-application command.add(flinkCommandArgs.getDeployMode().getDeployMode()); + // set restore checkpoint + if (StringUtils.isNoneBlank(flinkCommandArgs.getFromSavepoint())) { + command.add("-s"); + command.add(flinkCommandArgs.getFromSavepoint()); + } // set submitted target master if (flinkCommandArgs.getMasterType() != null) { command.add("--target"); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java index ff098b9df97..ff177fe88da 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java @@ -53,6 +53,13 @@ public class FlinkCommandArgs extends AbstractCommandArgs { + "kubernetes-session, yarn-application, kubernetes-application]") private MasterType masterType; + /** restore checkpoint path */ + @Parameter( + names = {"-s", "--fromSavepoint"}, + description = + "Path to a savepoint to restore the job from (for example, flink run -s hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902).") + protected String fromSavepoint; + @Override public Command buildCommand() { Common.setDeployMode(getDeployMode()); @@ -75,6 +82,8 @@ public String toString() { + deployMode + ", masterType=" + masterType + + ", fromSavepoint=" + + fromSavepoint + ", configFile='" + configFile + '\''