Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.client.deployment.executors.LocalExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Expand Down Expand Up @@ -83,6 +85,23 @@ void configurationIsForwarded() throws Exception {
assertThat(getStdoutString()).contains("Watermark interval is 42");
}

@Test
void configurationRestoreMode() throws Exception {
Configuration config = new Configuration();
CustomCommandLine commandLine = new DefaultCLI();

config.set(StateRecoveryOptions.RESTORE_MODE, RecoveryClaimMode.CLAIM);

CliFrontend cliFrontend = new CliFrontend(config, Collections.singletonList(commandLine));

cliFrontend.parseAndRun(
new String[] {
"run", "-c", TestingJob.class.getName(), CliFrontendTestUtils.getTestJarPath()
});

assertThat(getStdoutString()).contains("Restore mode is CLAIM");
}

@Test
void commandlineOverridesConfiguration() throws Exception {
Configuration config = new Configuration();
Expand Down Expand Up @@ -138,6 +157,9 @@ public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println(
"Watermark interval is " + env.getConfig().getAutoWatermarkInterval());
System.out.println(
"Restore mode is "
+ env.getConfiguration().get(StateRecoveryOptions.RESTORE_MODE));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,17 @@ public static SavepointRestoreSettings forPath(
public static void toConfiguration(
final SavepointRestoreSettings savepointRestoreSettings,
final Configuration configuration) {
configuration.set(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
savepointRestoreSettings.allowNonRestoredState());
configuration.set(
StateRecoveryOptions.RESTORE_MODE, savepointRestoreSettings.getRecoveryClaimMode());
final String savepointPath = savepointRestoreSettings.getRestorePath();
if (savepointPath != null) {
configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
if (!savepointRestoreSettings.equals(SavepointRestoreSettings.none())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious how this fix works. I see that the only case the fix will effect is the none case , how does the unit test relate to the none case?

configuration.set(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
savepointRestoreSettings.allowNonRestoredState());
configuration.set(
StateRecoveryOptions.RESTORE_MODE,
savepointRestoreSettings.getRecoveryClaimMode());
final String savepointPath = savepointRestoreSettings.getRestorePath();
if (savepointPath != null) {
configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
}
}
}

Expand Down