Skip to content

Commit 39728ba

Browse files
committed
Merge remote-tracking branch 'origin/MET-6379-Rethink-processing-engine-exception-handling-and-restart-strategies' into develop
2 parents 7b66ec5 + 8ac4f61 commit 39728ba

File tree

2 files changed

+32
-6
lines changed

2 files changed

+32
-6
lines changed

commons/src/main/java/eu/europeana/processing/MetisJob.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
1515
import org.apache.flink.util.ParameterTool;
1616
import org.apache.flink.configuration.Configuration;
17+
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
1718
import org.apache.flink.configuration.RestartStrategyOptions;
1819
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
1920
import org.apache.flink.streaming.api.functions.ProcessFunction;
2021

2122
import java.util.Map;
2223
import java.util.Random;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2326

2427
/**
2528
* <p>Main abstract class used by all the jobs executed by Metis.</p>
@@ -32,9 +35,20 @@
3235
*/
3336
public abstract class MetisJob {
3437

35-
private static final int RESTART_ATTEMPTS = 3;
36-
private static final int RESTART_DELAY_IN_SECONDS = 10;
38+
private static final Logger LOGGER = LoggerFactory.getLogger(MetisJob.class);
3739

40+
////////////////////////Failover strategy configuration/////////////////////////////////////////
41+
//All these gives us over 30 minutes of restarting in case of total infrastructure error. This
42+
// time is a bit random and depends on jitter and time of task starting.
43+
private static final Duration INITIAL_RESTART_DELAY_IN_SECONDS = Duration.ofSeconds(10);
44+
private static final Duration MAX_RESTART_DELAY_IN_SECONDS = Duration.ofMinutes(2);
45+
public static final double BACK_OFF_MULTIPLIER = 2.0;
46+
private static final int ATTEMPTS = 20;
47+
public static final double JITTER_FACTOR = 0.1;
48+
//After job works fine for configured time restart counter is reset.
49+
private static final Duration RESET_BACKOFF_THRESHOLD = Duration.ofMinutes(10);
50+
51+
////////////////////////Checkpointing configuration/////////////////////////////////////////////
3852
private static final long CHECKPOINT_INTERVAL_IN_MILLIS = 2000;
3953
private static final long MIN_PAUSE_BETWEEN_CHECKPOINTS = 1000;
4054

@@ -59,9 +73,13 @@ protected MetisJob(String[] args, String jobName) {
5973
protected StreamExecutionEnvironment prepareEnvironment() {
6074

6175
Configuration config = new Configuration();
62-
config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
63-
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, RESTART_ATTEMPTS);
64-
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(RESTART_DELAY_IN_SECONDS));
76+
config.set(RestartStrategyOptions.RESTART_STRATEGY, "exponential-delay");
77+
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF, INITIAL_RESTART_DELAY_IN_SECONDS);
78+
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF, MAX_RESTART_DELAY_IN_SECONDS);
79+
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER, BACK_OFF_MULTIPLIER);
80+
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_ATTEMPTS, ATTEMPTS);
81+
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_RESET_BACKOFF_THRESHOLD, RESET_BACKOFF_THRESHOLD);
82+
config.set(RestartStrategyOptions.RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR, JITTER_FACTOR);
6583

6684
final StreamExecutionEnvironment env =
6785
StreamExecutionEnvironment.getExecutionEnvironment(config);
@@ -71,6 +89,7 @@ protected StreamExecutionEnvironment prepareEnvironment() {
7189
env.getConfig().setGlobalJobParameters(tool);
7290
env.enableCheckpointing(CHECKPOINT_INTERVAL_IN_MILLIS);
7391
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(MIN_PAUSE_BETWEEN_CHECKPOINTS);
92+
env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
7493
return env;
7594
}
7695

@@ -113,6 +132,7 @@ protected void prepareJob() {
113132
*/
114133
public void execute() throws Exception {
115134
prepareJob();
135+
LOGGER.info("Execution plan: {}", flinkEnvironment.getExecutionPlan());
116136
flinkEnvironment.execute(enrichedJobName());
117137
}
118138

http/src/main/java/eu/europeana/processing/http/reader/dowloader/HttpSourceFileDownloader.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
import eu.europeana.metis.harvesting.http.HttpHarvester;
66
import eu.europeana.processing.http.reader.exception.HttpSourceException;
77
import java.io.IOException;
8+
import java.net.UnknownHostException;
89
import java.nio.file.Files;
910
import java.nio.file.Path;
11+
import org.apache.flink.runtime.execution.SuppressRestartsException;
1012
import org.slf4j.Logger;
1113
import org.slf4j.LoggerFactory;
1214

@@ -45,7 +47,11 @@ public Path download() {
4547
LOGGER.info("Downloaded file: {} from the url: {}", fileName, archiveUrl);
4648
return fileName;
4749
} catch (IOException | HarvesterException e) {
48-
throw new HttpSourceException("Could not download archive file: " + archiveUrl, e);
50+
HttpSourceException wrappedException = new HttpSourceException("Could not download archive file: " + archiveUrl, e);
51+
if(e.getCause() instanceof UnknownHostException){
52+
throw new SuppressRestartsException(wrappedException);
53+
}
54+
throw wrappedException;
4955
}
5056
}
5157

0 commit comments

Comments
 (0)