Skip to content

Commit 7a5e795

Browse files
author
Alex Wang
committed
improvement: Improve polling mechanism
1 parent d9b8cef commit 7a5e795

File tree

7 files changed

+590
-16
lines changed

7 files changed

+590
-16
lines changed

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import software.amazon.lambda.durable.client.DurableExecutionClient;
2222
import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient;
2323
import software.amazon.lambda.durable.logging.LoggerConfig;
24+
import software.amazon.lambda.durable.retry.JitterStrategy;
2425
import software.amazon.lambda.durable.serde.JacksonSerDes;
2526
import software.amazon.lambda.durable.serde.SerDes;
2627

@@ -78,6 +79,8 @@ public final class DurableConfig {
7879
private final ExecutorService executorService;
7980
private final LoggerConfig loggerConfig;
8081
private final Duration pollingInterval;
82+
private final JitterStrategy pollingJitter;
83+
private final double pollingBackoffRate;
8184
private final Duration checkpointDelay;
8285

8386
private DurableConfig(Builder builder) {
@@ -88,6 +91,8 @@ private DurableConfig(Builder builder) {
8891
this.executorService = builder.executorService != null ? builder.executorService : createDefaultExecutor();
8992
this.loggerConfig = builder.loggerConfig != null ? builder.loggerConfig : LoggerConfig.defaults();
9093
this.pollingInterval = builder.pollingInterval != null ? builder.pollingInterval : Duration.ofMillis(1000);
94+
this.pollingJitter = builder.pollingJitter != null ? builder.pollingJitter : JitterStrategy.FULL;
95+
this.pollingBackoffRate = builder.pollingBackoffRate != 0.0 ? builder.pollingBackoffRate : 2.0;
9196
this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0);
9297
}
9398

@@ -154,6 +159,24 @@ public Duration getPollingInterval() {
154159
return pollingInterval;
155160
}
156161

162+
/**
163+
* Gets the polling backoff rate.
164+
*
165+
* @return polling backoff rate.
166+
*/
167+
public double getPollingBackoffRate() {
168+
return pollingBackoffRate;
169+
}
170+
171+
/**
172+
* Gets the polling jitter.
173+
*
174+
* @return polling jitter.
175+
*/
176+
public JitterStrategy getPollingJitter() {
177+
return pollingJitter;
178+
}
179+
157180
/**
158181
* Gets the configured checkpoint delay.
159182
*
@@ -249,6 +272,8 @@ public static final class Builder {
249272
private ExecutorService executorService;
250273
private LoggerConfig loggerConfig;
251274
private Duration pollingInterval;
275+
private JitterStrategy pollingJitter;
276+
private double pollingBackoffRate;
252277
private Duration checkpointDelay;
253278

254279
private Builder() {}
@@ -347,6 +372,31 @@ public Builder withPollingInterval(Duration duration) {
347372
return this;
348373
}
349374

375+
/**
376+
* Sets the polling backoff rate.
377+
*
378+
* @param backoffRate the backoff rate (must be positive)
379+
* @return This builder
380+
*/
381+
public Builder withPollingBackoffRate(double backoffRate) {
382+
if (backoffRate <= 0) {
383+
throw new IllegalArgumentException("backoffRate must be positive");
384+
}
385+
this.pollingBackoffRate = backoffRate;
386+
return this;
387+
}
388+
389+
/**
390+
* Sets the polling jitter strategy.
391+
*
392+
* @param jitter the jitter strategy to use
393+
* @return This builder
394+
*/
395+
public Builder withPollingJitter(JitterStrategy jitter) {
396+
this.pollingJitter = jitter;
397+
return this;
398+
}
399+
350400
/**
351401
* Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, which disables checkpoint
352402
* batching.

sdk/src/main/java/software/amazon/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,16 @@ CompletableFuture<Void> checkpoint(OperationUpdate update) {
5757

5858
/** Polls for updates of the specified operation with preconfigured intervals */
5959
CompletableFuture<Operation> pollForUpdate(String operationId) {
60-
return pollForUpdate(operationId, config.getPollingInterval());
60+
return pollForUpdate(operationId, config.getPollingInterval(), false);
6161
}
6262

6363
/** Polls for updates of the specified operation with specified delay */
6464
CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
65+
return pollForUpdate(operationId, delay, true);
66+
}
67+
68+
/** Polls for updates of the specified operation with custom delay and interval behavior. */
69+
CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay, boolean isFixedInterval) {
6570
logger.debug("Polling request received: operation id {}", operationId);
6671
var future = new CompletableFuture<Operation>();
6772
synchronized (pollingFutures) {
@@ -70,16 +75,24 @@ CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
7075
.computeIfAbsent(operationId, k -> Collections.synchronizedList(new ArrayList<>()))
7176
.add(future);
7277
}
73-
pollForUpdateInternal(future, delay);
78+
pollForUpdateInternal(future, delay, isFixedInterval, 0);
7479
return future;
7580
}
7681

77-
private CompletableFuture<Void> pollForUpdateInternal(CompletableFuture<Operation> future, Duration delay) {
78-
return checkpointApiRequestBatcher.submit(null, delay).thenCompose(v -> {
82+
private CompletableFuture<Void> pollForUpdateInternal(
83+
CompletableFuture<Operation> future, Duration initialDelay, boolean isFixedInterval, int attempt) {
84+
Duration actualDelay = initialDelay;
85+
if (!isFixedInterval) {
86+
double delayMilli = initialDelay.toMillis();
87+
delayMilli = delayMilli * Math.pow(config.getPollingBackoffRate(), attempt);
88+
delayMilli = config.getPollingJitter().apply(delayMilli);
89+
actualDelay = Duration.ofMillis(Math.round(delayMilli));
90+
}
91+
return checkpointApiRequestBatcher.submit(null, actualDelay).thenCompose(v -> {
7992
if (future.isDone()) {
8093
return CompletableFuture.completedFuture(null);
8194
}
82-
return pollForUpdateInternal(future, delay);
95+
return pollForUpdateInternal(future, initialDelay, isFixedInterval, attempt + 1);
8396
});
8497
}
8598

sdk/src/main/java/software/amazon/lambda/durable/retry/JitterStrategy.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,32 @@ public enum JitterStrategy {
1313
/**
1414
* No jitter - use exact calculated delay. This provides predictable timing but may cause thundering herd issues.
1515
*/
16-
NONE,
17-
16+
NONE {
17+
@Override
18+
public double apply(double baseDelay) {
19+
return baseDelay;
20+
}
21+
},
1822
/**
1923
* Full jitter - random delay between 0 and calculated delay. This provides maximum spread but may result in very
2024
* short delays.
2125
*/
22-
FULL,
23-
26+
FULL {
27+
@Override
28+
public double apply(double baseDelay) {
29+
return Math.random() * baseDelay;
30+
}
31+
},
2432
/**
2533
* Half jitter - random delay between 50% and 100% of calculated delay. This provides good spread while maintaining
2634
* reasonable minimum delays.
2735
*/
28-
HALF
36+
HALF {
37+
@Override
38+
public double apply(double baseDelay) {
39+
return baseDelay / 2 + Math.random() * (baseDelay / 2);
40+
}
41+
};
42+
43+
public abstract double apply(double baseDelay);
2944
}

sdk/src/main/java/software/amazon/lambda/durable/retry/RetryStrategies.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,7 @@ public static RetryStrategy exponentialBackoff(
6969
double baseDelay = Math.min(initialDelaySeconds * Math.pow(backoffRate, attemptNumber), maxDelaySeconds);
7070

7171
// Apply jitter
72-
double delayWithJitter =
73-
switch (jitter) {
74-
case NONE -> baseDelay;
75-
case FULL -> Math.random() * baseDelay;
76-
case HALF -> baseDelay / 2 + Math.random() * (baseDelay / 2);
77-
};
72+
double delayWithJitter = jitter.apply(baseDelay);
7873

7974
// Round to nearest second, minimum 1
8075
// Same rounding logic as TS SDK: https://tinyurl.com/4ntxsefu

sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import static org.junit.jupiter.api.Assertions.assertTrue;
1313
import static org.mockito.Mockito.mock;
1414

15+
import java.time.Duration;
1516
import java.util.concurrent.ExecutorService;
1617
import org.junit.jupiter.api.BeforeEach;
1718
import org.junit.jupiter.api.Test;
1819
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
1920
import software.amazon.awssdk.services.lambda.LambdaClient;
2021
import software.amazon.lambda.durable.client.DurableExecutionClient;
2122
import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient;
23+
import software.amazon.lambda.durable.retry.JitterStrategy;
2224
import software.amazon.lambda.durable.serde.JacksonSerDes;
2325
import software.amazon.lambda.durable.serde.SerDes;
2426

@@ -294,4 +296,148 @@ void testAddUserAgentSuffix_ReturnsSameBuilderInstance() {
294296

295297
assertSame(lambdaClientBuilder, result);
296298
}
299+
300+
// --- Polling interval tests ---
301+
302+
@Test
303+
void testDefaultConfig_PollingIntervalDefaults() {
304+
var config = DurableConfig.defaultConfig();
305+
306+
assertEquals(Duration.ofMillis(1000), config.getPollingInterval());
307+
assertEquals(JitterStrategy.FULL, config.getPollingJitter());
308+
assertEquals(2.0, config.getPollingBackoffRate());
309+
assertEquals(Duration.ofSeconds(0), config.getCheckpointDelay());
310+
}
311+
312+
@Test
313+
void testBuilder_WithCustomPollingInterval() {
314+
var config = DurableConfig.builder()
315+
.withDurableExecutionClient(mockClient)
316+
.withPollingInterval(Duration.ofMillis(500))
317+
.build();
318+
319+
assertEquals(Duration.ofMillis(500), config.getPollingInterval());
320+
}
321+
322+
@Test
323+
void testBuilder_WithCustomPollingBackoffRate() {
324+
var config = DurableConfig.builder()
325+
.withDurableExecutionClient(mockClient)
326+
.withPollingBackoffRate(3.0)
327+
.build();
328+
329+
assertEquals(3.0, config.getPollingBackoffRate());
330+
}
331+
332+
@Test
333+
void testBuilder_WithPollingBackoffRateNotSet_UsesDefault() {
334+
var config =
335+
DurableConfig.builder().withDurableExecutionClient(mockClient).build();
336+
337+
assertEquals(2.0, config.getPollingBackoffRate());
338+
}
339+
340+
@Test
341+
void testBuilder_WithZeroPollingBackoffRate_ThrowsException() {
342+
var builder = DurableConfig.builder();
343+
344+
var exception = assertThrows(IllegalArgumentException.class, () -> builder.withPollingBackoffRate(0.0));
345+
346+
assertEquals("backoffRate must be positive", exception.getMessage());
347+
}
348+
349+
@Test
350+
void testBuilder_WithNegativePollingBackoffRate_ThrowsException() {
351+
var builder = DurableConfig.builder();
352+
353+
var exception = assertThrows(IllegalArgumentException.class, () -> builder.withPollingBackoffRate(-1.0));
354+
355+
assertEquals("backoffRate must be positive", exception.getMessage());
356+
}
357+
358+
@Test
359+
void testBuilder_WithCustomPollingJitter() {
360+
var config = DurableConfig.builder()
361+
.withDurableExecutionClient(mockClient)
362+
.withPollingJitter(JitterStrategy.NONE)
363+
.build();
364+
365+
assertEquals(JitterStrategy.NONE, config.getPollingJitter());
366+
}
367+
368+
@Test
369+
void testBuilder_WithPollingJitterHalf() {
370+
var config = DurableConfig.builder()
371+
.withDurableExecutionClient(mockClient)
372+
.withPollingJitter(JitterStrategy.HALF)
373+
.build();
374+
375+
assertEquals(JitterStrategy.HALF, config.getPollingJitter());
376+
}
377+
378+
@Test
379+
void testBuilder_WithPollingJitterNull_UsesDefault() {
380+
var config = DurableConfig.builder()
381+
.withDurableExecutionClient(mockClient)
382+
.withPollingJitter(null)
383+
.build();
384+
385+
assertEquals(JitterStrategy.FULL, config.getPollingJitter());
386+
}
387+
388+
@Test
389+
void testBuilder_WithCustomCheckpointDelay() {
390+
var config = DurableConfig.builder()
391+
.withDurableExecutionClient(mockClient)
392+
.withCheckpointDelay(Duration.ofSeconds(5))
393+
.build();
394+
395+
assertEquals(Duration.ofSeconds(5), config.getCheckpointDelay());
396+
}
397+
398+
@Test
399+
void testBuilder_WithAllPollingSettings() {
400+
var config = DurableConfig.builder()
401+
.withDurableExecutionClient(mockClient)
402+
.withPollingInterval(Duration.ofMillis(200))
403+
.withPollingBackoffRate(1.5)
404+
.withPollingJitter(JitterStrategy.HALF)
405+
.withCheckpointDelay(Duration.ofSeconds(2))
406+
.build();
407+
408+
assertEquals(Duration.ofMillis(200), config.getPollingInterval());
409+
assertEquals(1.5, config.getPollingBackoffRate());
410+
assertEquals(JitterStrategy.HALF, config.getPollingJitter());
411+
assertEquals(Duration.ofSeconds(2), config.getCheckpointDelay());
412+
}
413+
414+
@Test
415+
void testBuilder_FluentAPI_PollingMethods() {
416+
var builder = DurableConfig.builder();
417+
418+
assertSame(builder, builder.withPollingInterval(Duration.ofMillis(500)));
419+
assertSame(builder, builder.withPollingBackoffRate(2.0));
420+
assertSame(builder, builder.withPollingJitter(JitterStrategy.FULL));
421+
assertSame(builder, builder.withCheckpointDelay(Duration.ofSeconds(1)));
422+
}
423+
424+
@Test
425+
void testBuilder_PollingIntervalNull_UsesDefault() {
426+
var config = DurableConfig.builder()
427+
.withDurableExecutionClient(mockClient)
428+
.withPollingInterval(null)
429+
.build();
430+
431+
assertEquals(Duration.ofMillis(1000), config.getPollingInterval());
432+
}
433+
434+
@Test
435+
void testBuilder_CheckpointDelayNull_UsesDefault() {
436+
var config = DurableConfig.builder()
437+
.withDurableExecutionClient(mockClient)
438+
.withCheckpointDelay(null)
439+
.build();
440+
441+
assertEquals(Duration.ofSeconds(0), config.getCheckpointDelay());
442+
}
297443
}

0 commit comments

Comments
 (0)