Skip to content

Commit ecc92ed

Browse files
author
Alex Wang
committed
improvement: Improve polling mechanism
- Refactor jitter - Add polling strategies class - Tests
1 parent d9b8cef commit ecc92ed

File tree

12 files changed

+889
-34
lines changed

12 files changed

+889
-34
lines changed

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/CustomConfigIntegrationTest.java

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,16 @@
66
import static org.junit.jupiter.api.Assertions.assertNotNull;
77
import static org.junit.jupiter.api.Assertions.assertTrue;
88

9+
import java.time.Duration;
910
import java.util.concurrent.atomic.AtomicInteger;
1011
import org.junit.jupiter.api.Test;
1112
import software.amazon.awssdk.regions.Region;
1213
import software.amazon.awssdk.services.lambda.LambdaClient;
1314
import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient;
15+
import software.amazon.lambda.durable.model.ExecutionStatus;
16+
import software.amazon.lambda.durable.retry.JitterStrategy;
17+
import software.amazon.lambda.durable.retry.PollingStrategies;
18+
import software.amazon.lambda.durable.retry.RetryStrategies;
1419
import software.amazon.lambda.durable.serde.JacksonSerDes;
1520
import software.amazon.lambda.durable.serde.SerDes;
1621
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
@@ -147,14 +152,14 @@ void testCustomConfig_WithRetry() {
147152
throw new RuntimeException("Simulated failure attempt " + currentAttempt);
148153
},
149154
StepConfig.builder()
150-
.retryStrategy(software.amazon.lambda.durable.retry.RetryStrategies.Presets.DEFAULT)
155+
.retryStrategy(RetryStrategies.Presets.DEFAULT)
151156
.build());
152157
},
153158
customConfig);
154159

155160
// First run should return PENDING (retry scheduled) - matching existing RetryIntegrationTest pattern
156161
var result = runner.run("test");
157-
assertEquals(software.amazon.lambda.durable.model.ExecutionStatus.PENDING, result.getStatus());
162+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
158163
assertEquals(1, attemptCount.get());
159164

160165
// Verify custom SerDes was used during retry operations
@@ -284,4 +289,94 @@ void testCustomDurableExecutionClient_Configuration() {
284289
lambdaClient.close();
285290
}
286291
}
292+
293+
// --- Polling strategy integration tests ---
294+
295+
@Test
296+
void testCustomPollingStrategy_ExponentialBackoff_WithSteps() {
297+
var customConfig = DurableConfig.builder()
298+
.withPollingStrategy(
299+
PollingStrategies.exponentialBackoff(Duration.ofMillis(100), 2.0, JitterStrategy.FULL))
300+
.build();
301+
302+
var runner = LocalDurableTestRunner.create(
303+
String.class,
304+
(input, context) -> {
305+
var step1 = context.step("step1", String.class, () -> "first");
306+
var step2 = context.step("step2", String.class, () -> "second");
307+
return step1 + "," + step2;
308+
},
309+
customConfig);
310+
311+
var result = runner.run("test");
312+
313+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
314+
assertEquals("first,second", result.getResult(String.class));
315+
assertNotNull(result.getOperation("step1"));
316+
assertNotNull(result.getOperation("step2"));
317+
}
318+
319+
@Test
320+
void testCustomPollingStrategy_FixedDelay_WithWait() {
321+
var customConfig = DurableConfig.builder()
322+
.withPollingStrategy(PollingStrategies.fixedDelay(Duration.ofMillis(200)))
323+
.build();
324+
325+
var runner = LocalDurableTestRunner.create(
326+
String.class,
327+
(input, context) -> {
328+
var step1 = context.step("before-wait", String.class, () -> "before");
329+
context.wait("my-wait", Duration.ofSeconds(5));
330+
var step2 = context.step("after-wait", String.class, () -> "after");
331+
return step1 + "," + step2;
332+
},
333+
customConfig);
334+
335+
// First run suspends at wait
336+
var result = runner.run("test");
337+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
338+
339+
// Advance time and complete
340+
var finalResult = runner.runUntilComplete("test");
341+
assertEquals(ExecutionStatus.SUCCEEDED, finalResult.getStatus());
342+
assertEquals("before,after", finalResult.getResult(String.class));
343+
}
344+
345+
@Test
346+
void testCustomPollingStrategy_WithRetry() {
347+
var attemptCount = new AtomicInteger(0);
348+
349+
var customConfig = DurableConfig.builder()
350+
.withPollingStrategy(
351+
PollingStrategies.exponentialBackoff(Duration.ofMillis(50), 1.5, JitterStrategy.HALF))
352+
.build();
353+
354+
var runner = LocalDurableTestRunner.create(
355+
String.class,
356+
(input, context) -> {
357+
return context.step(
358+
"retry-step",
359+
String.class,
360+
() -> {
361+
int attempt = attemptCount.incrementAndGet();
362+
if (attempt < 3) {
363+
throw new RuntimeException("Fail attempt " + attempt);
364+
}
365+
return "success on attempt " + attempt;
366+
},
367+
StepConfig.builder()
368+
.retryStrategy(RetryStrategies.Presets.DEFAULT)
369+
.build());
370+
},
371+
customConfig);
372+
373+
// First run fails, schedules retry → PENDING
374+
var result = runner.run("test");
375+
assertEquals(ExecutionStatus.PENDING, result.getStatus());
376+
377+
// Run until complete — retries until attempt 3 succeeds
378+
var finalResult = runner.runUntilComplete("test");
379+
assertEquals(ExecutionStatus.SUCCEEDED, finalResult.getStatus());
380+
assertEquals("success on attempt 3", finalResult.getResult(String.class));
381+
}
287382
}

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ public TestResult<O> run(I input) {
155155
.withDurableExecutionClient(storage)
156156
.withSerDes(customerConfig.getSerDes())
157157
.withExecutorService(customerConfig.getExecutorService())
158+
.withPollingStrategy(customerConfig.getPollingStrategy())
159+
.withCheckpointDelay(customerConfig.getCheckpointDelay())
160+
.withLoggerConfig(customerConfig.getLoggerConfig())
158161
.build();
159162
} else {
160163
// Fallback to default config with in-memory client

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import software.amazon.lambda.durable.client.DurableExecutionClient;
2222
import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient;
2323
import software.amazon.lambda.durable.logging.LoggerConfig;
24+
import software.amazon.lambda.durable.retry.PollingStrategies;
25+
import software.amazon.lambda.durable.retry.PollingStrategy;
2426
import software.amazon.lambda.durable.serde.JacksonSerDes;
2527
import software.amazon.lambda.durable.serde.SerDes;
2628

@@ -77,7 +79,7 @@ public final class DurableConfig {
7779
private final SerDes serDes;
7880
private final ExecutorService executorService;
7981
private final LoggerConfig loggerConfig;
80-
private final Duration pollingInterval;
82+
private final PollingStrategy pollingStrategy;
8183
private final Duration checkpointDelay;
8284

8385
private DurableConfig(Builder builder) {
@@ -87,7 +89,8 @@ private DurableConfig(Builder builder) {
8789
this.serDes = builder.serDes != null ? builder.serDes : new JacksonSerDes();
8890
this.executorService = builder.executorService != null ? builder.executorService : createDefaultExecutor();
8991
this.loggerConfig = builder.loggerConfig != null ? builder.loggerConfig : LoggerConfig.defaults();
90-
this.pollingInterval = builder.pollingInterval != null ? builder.pollingInterval : Duration.ofMillis(1000);
92+
this.pollingStrategy =
93+
builder.pollingStrategy != null ? builder.pollingStrategy : PollingStrategies.Presets.DEFAULT;
9194
this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0);
9295
}
9396

@@ -146,12 +149,12 @@ public LoggerConfig getLoggerConfig() {
146149
}
147150

148151
/**
149-
* Gets the configured polling interval.
152+
* Gets the polling strategy.
150153
*
151-
* @return polling interval in Duration.
154+
* @return PollingStrategy instance (never null)
152155
*/
153-
public Duration getPollingInterval() {
154-
return pollingInterval;
156+
public PollingStrategy getPollingStrategy() {
157+
return pollingStrategy;
155158
}
156159

157160
/**
@@ -248,7 +251,7 @@ public static final class Builder {
248251
private SerDes serDes;
249252
private ExecutorService executorService;
250253
private LoggerConfig loggerConfig;
251-
private Duration pollingInterval;
254+
private PollingStrategy pollingStrategy;
252255
private Duration checkpointDelay;
253256

254257
private Builder() {}
@@ -336,14 +339,14 @@ public Builder withLoggerConfig(LoggerConfig loggerConfig) {
336339
}
337340

338341
/**
339-
* Sets how often the SDK polls updates from backend.
342+
* Sets the polling strategy. If not set, defaults to 1 second with full jitter and 2x backoff.
340343
*
341-
* @param duration the polling interval in Duration
344+
* @param pollingStrategy Custom PollingStrategy instance
342345
* @return This builder
343346
*/
344-
public Builder withPollingInterval(Duration duration) {
347+
public Builder withPollingStrategy(PollingStrategy pollingStrategy) {
345348
// No validation - polling intervals can be less than 1 second (e.g., 200ms with backoff)
346-
this.pollingInterval = duration;
349+
this.pollingStrategy = pollingStrategy;
347350
return this;
348351
}
349352

sdk/src/main/java/software/amazon/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import software.amazon.awssdk.services.lambda.model.Operation;
1818
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
1919
import software.amazon.lambda.durable.DurableConfig;
20+
import software.amazon.lambda.durable.retry.PollingStrategies;
21+
import software.amazon.lambda.durable.retry.PollingStrategy;
2022

2123
/**
2224
* Package-private checkpoint manager for batching and queueing checkpoint API calls.
@@ -57,11 +59,16 @@ CompletableFuture<Void> checkpoint(OperationUpdate update) {
5759

5860
/** Polls for updates of the specified operation with preconfigured intervals */
5961
CompletableFuture<Operation> pollForUpdate(String operationId) {
60-
return pollForUpdate(operationId, config.getPollingInterval());
62+
return pollForUpdate(operationId, config.getPollingStrategy());
6163
}
6264

6365
/** Polls for updates of the specified operation with specified delay */
6466
CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
67+
return pollForUpdate(operationId, PollingStrategies.fixedDelay(delay));
68+
}
69+
70+
/** Polls for updates of the specified operation with specified polling strategy */
71+
CompletableFuture<Operation> pollForUpdate(String operationId, PollingStrategy pollingStrategy) {
6572
logger.debug("Polling request received: operation id {}", operationId);
6673
var future = new CompletableFuture<Operation>();
6774
synchronized (pollingFutures) {
@@ -70,17 +77,20 @@ CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
7077
.computeIfAbsent(operationId, k -> Collections.synchronizedList(new ArrayList<>()))
7178
.add(future);
7279
}
73-
pollForUpdateInternal(future, delay);
80+
pollForUpdateInternal(future, 0, pollingStrategy);
7481
return future;
7582
}
7683

77-
private CompletableFuture<Void> pollForUpdateInternal(CompletableFuture<Operation> future, Duration delay) {
78-
return checkpointApiRequestBatcher.submit(null, delay).thenCompose(v -> {
79-
if (future.isDone()) {
80-
return CompletableFuture.completedFuture(null);
81-
}
82-
return pollForUpdateInternal(future, delay);
83-
});
84+
private CompletableFuture<Void> pollForUpdateInternal(
85+
CompletableFuture<Operation> future, int attempt, PollingStrategy pollingStrategy) {
86+
return checkpointApiRequestBatcher
87+
.submit(null, pollingStrategy.computeDelay(attempt))
88+
.thenCompose(v -> {
89+
if (future.isDone()) {
90+
return CompletableFuture.completedFuture(null);
91+
}
92+
return pollForUpdateInternal(future, attempt + 1, pollingStrategy);
93+
});
8494
}
8595

8696
/** Cancels all polling futures and waits for all pending checkpoint requests to complete */

sdk/src/main/java/software/amazon/lambda/durable/retry/JitterStrategy.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,32 @@ public enum JitterStrategy {
1313
/**
1414
* No jitter - use exact calculated delay. This provides predictable timing but may cause thundering herd issues.
1515
*/
16-
NONE,
17-
16+
NONE {
17+
@Override
18+
public double apply(double baseDelay) {
19+
return baseDelay;
20+
}
21+
},
1822
/**
1923
* Full jitter - random delay between 0 and calculated delay. This provides maximum spread but may result in very
2024
* short delays.
2125
*/
22-
FULL,
23-
26+
FULL {
27+
@Override
28+
public double apply(double baseDelay) {
29+
return Math.random() * baseDelay;
30+
}
31+
},
2432
/**
2533
* Half jitter - random delay between 50% and 100% of calculated delay. This provides good spread while maintaining
2634
* reasonable minimum delays.
2735
*/
28-
HALF
36+
HALF {
37+
@Override
38+
public double apply(double baseDelay) {
39+
return baseDelay / 2 + Math.random() * (baseDelay / 2);
40+
}
41+
};
42+
43+
public abstract double apply(double baseDelay);
2944
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.retry;
4+
5+
import java.time.Duration;
6+
import java.util.Objects;
7+
8+
/** Factory class for creating common polling strategies. */
9+
public class PollingStrategies {
10+
11+
/** Preset polling strategies for common use cases. */
12+
public static class Presets {
13+
14+
/** Default polling strategy: - Base interval: 1 second - Backoff rate: 2x - Jitter: FULL */
15+
public static final PollingStrategy DEFAULT =
16+
exponentialBackoff(Duration.ofMillis(1000), 2.0, JitterStrategy.FULL);
17+
}
18+
19+
/**
20+
* Creates an exponential backoff polling strategy.
21+
*
22+
* <p>The delay calculation follows the formula: delay = jitter(baseInterval × backoffRate^attempt)
23+
*
24+
* @param baseInterval Base delay before first poll
25+
* @param backoffRate Multiplier for exponential backoff (must be positive)
26+
* @param jitter Jitter strategy to apply to delays
27+
* @return PollingStrategy implementing exponential backoff with jitter
28+
*/
29+
public static PollingStrategy exponentialBackoff(Duration baseInterval, double backoffRate, JitterStrategy jitter) {
30+
Objects.requireNonNull(jitter, "jitter must not be null");
31+
Objects.requireNonNull(baseInterval, "base interval must not be null");
32+
if (backoffRate <= 0) {
33+
throw new IllegalArgumentException("backoffRate must be positive");
34+
}
35+
36+
if (baseInterval.isNegative() || baseInterval.isZero()) {
37+
throw new IllegalArgumentException("baseInterval must be positive");
38+
}
39+
40+
return (attempt) -> {
41+
double delayMs = baseInterval.toMillis() * Math.pow(backoffRate, attempt);
42+
delayMs = jitter.apply(delayMs);
43+
return Duration.ofMillis(Math.round(delayMs));
44+
};
45+
}
46+
47+
/**
48+
* Creates a fixed-delay polling strategy that uses the same interval for every attempt.
49+
*
50+
* @param interval Fixed delay between polls
51+
* @return PollingStrategy with fixed delay
52+
*/
53+
public static PollingStrategy fixedDelay(Duration interval) {
54+
Objects.requireNonNull(interval, "interval must not be null");
55+
if (interval.isNegative() || interval.isZero()) {
56+
throw new IllegalArgumentException("interval must be positive");
57+
}
58+
return (attempt) -> interval;
59+
}
60+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.retry;
4+
5+
import java.time.Duration;
6+
7+
/** Functional interface for computing polling delays between attempts. */
8+
@FunctionalInterface
9+
public interface PollingStrategy {
10+
11+
/**
12+
* Computes the delay before the next polling attempt.
13+
*
14+
* @param attempt The current attempt number (0-based)
15+
* @return Duration to wait before the next poll
16+
*/
17+
Duration computeDelay(int attempt);
18+
}

sdk/src/main/java/software/amazon/lambda/durable/retry/RetryStrategies.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,7 @@ public static RetryStrategy exponentialBackoff(
6969
double baseDelay = Math.min(initialDelaySeconds * Math.pow(backoffRate, attemptNumber), maxDelaySeconds);
7070

7171
// Apply jitter
72-
double delayWithJitter =
73-
switch (jitter) {
74-
case NONE -> baseDelay;
75-
case FULL -> Math.random() * baseDelay;
76-
case HALF -> baseDelay / 2 + Math.random() * (baseDelay / 2);
77-
};
72+
double delayWithJitter = jitter.apply(baseDelay);
7873

7974
// Round to nearest second, minimum 1
8075
// Same rounding logic as TS SDK: https://tinyurl.com/4ntxsefu

0 commit comments

Comments
 (0)