Skip to content

Commit bb907d9

Browse files
committed
feat(map): Fix map replay, wait-inside-map, and concurrency race conditions
1 parent fde6a1b commit bb907d9

File tree

9 files changed

+106
-24
lines changed

9 files changed

+106
-24
lines changed

sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,26 @@ public static CompletionConfig firstSuccessful() {
3636

3737
/** Complete when the specified number of items have succeeded. */
3838
public static CompletionConfig minSuccessful(int count) {
39+
if (count < 1) {
40+
throw new IllegalArgumentException("minSuccessful must be at least 1, got: " + count);
41+
}
3942
return new CompletionConfig(count, null, null);
4043
}
4144

4245
/** Complete when more than the specified number of failures have occurred. */
4346
public static CompletionConfig toleratedFailureCount(int count) {
47+
if (count < 0) {
48+
throw new IllegalArgumentException("toleratedFailureCount must be non-negative, got: " + count);
49+
}
4450
return new CompletionConfig(null, count, null);
4551
}
4652

4753
/** Complete when the failure percentage exceeds the specified threshold (0.0 to 1.0). */
4854
public static CompletionConfig toleratedFailurePercentage(double percentage) {
55+
if (percentage < 0.0 || percentage > 1.0) {
56+
throw new IllegalArgumentException(
57+
"toleratedFailurePercentage must be between 0.0 and 1.0, got: " + percentage);
58+
}
4959
return new CompletionConfig(null, null, percentage);
5060
}
5161

sdk/src/main/java/software/amazon/lambda/durable/MapConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public Builder serDes(SerDes serDes) {
7171
}
7272

7373
public MapConfig build() {
74+
if (maxConcurrency != null && maxConcurrency < 1) {
75+
throw new IllegalArgumentException("maxConcurrency must be at least 1, got: " + maxConcurrency);
76+
}
7477
return new MapConfig(this);
7578
}
7679
}

sdk/src/main/java/software/amazon/lambda/durable/model/MapResult.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ public ErrorObject getError(int index) {
4848
return items.get(index).error();
4949
}
5050

51-
/** Returns true if all items succeeded (no errors). */
51+
/** Returns true if all items succeeded (no failures or not-started items). */
5252
public boolean allSucceeded() {
53-
return items.stream().noneMatch(item -> item.error() != null);
53+
return items.stream().allMatch(item -> item.status() == MapResultItem.Status.SUCCEEDED);
5454
}
5555

5656
/** Returns the number of items in this result. */
@@ -64,13 +64,19 @@ public List<T> results() {
6464
items.stream().map(MapResultItem::result).toList());
6565
}
6666

67-
/** Returns results that succeeded (non-null results). */
67+
/** Returns results from items that succeeded (includes null results from successful items). */
6868
public List<T> succeeded() {
69-
return items.stream().map(MapResultItem::result).filter(r -> r != null).toList();
69+
return items.stream()
70+
.filter(item -> item.status() == MapResultItem.Status.SUCCEEDED)
71+
.map(MapResultItem::result)
72+
.toList();
7073
}
7174

72-
/** Returns errors that occurred (non-null errors). */
75+
/** Returns errors from items that failed. */
7376
public List<ErrorObject> failed() {
74-
return items.stream().map(MapResultItem::error).filter(e -> e != null).toList();
77+
return items.stream()
78+
.filter(item -> item.status() == MapResultItem.Status.FAILED)
79+
.map(MapResultItem::error)
80+
.toList();
7581
}
7682
}

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -370,11 +370,8 @@ public R get() {
370370
}
371371
}
372372

373-
// Race completionFuture against executionExceptionFuture.
374-
// If branches suspend, executionExceptionFuture completes with SuspendExecutionException,
375-
// which propagates up through the handler to DurableExecutor → returns PENDING.
376-
// If branches complete, completionFuture completes and we proceed to read the result.
377-
executionManager.runUntilCompleteOrSuspend(completionFuture).join();
373+
// Block until operation completes. No-op if the future is already completed.
374+
completionFuture.join();
378375

379376
var op = getOperation();
380377

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,12 +209,8 @@ protected Operation waitForOperationCompletion() {
209209
}
210210
}
211211

212-
// Block until operation completes or execution suspends.
213-
// Using runUntilCompleteOrSuspend races completionFuture against executionExceptionFuture,
214-
// so when all active threads suspend (e.g., wait inside map branches), the
215-
// SuspendExecutionException propagates and this thread is freed — preventing thread leaks
216-
// on shared executor pools across invocations.
217-
executionManager.runUntilCompleteOrSuspend(completionFuture).join();
212+
// Block until operation completes. No-op if the future is already completed.
213+
completionFuture.join();
218214

219215
// Get result based on status
220216
var op = getOperation();

sdk/src/main/java/software/amazon/lambda/durable/validation/ParameterValidator.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.HashMap;
88
import java.util.HashSet;
99
import java.util.IdentityHashMap;
10+
import java.util.LinkedHashSet;
1011
import java.util.Set;
1112
import java.util.WeakHashMap;
1213
import java.util.concurrent.ConcurrentHashMap;
@@ -118,8 +119,9 @@ public static void validateOperationName(String name, int maxLength) {
118119
/**
119120
* Validates that a collection has deterministic iteration order.
120121
*
121-
* <p>Rejects known unordered collection types: {@link HashSet} (and subclasses), and views returned by
122-
* {@link HashMap}, {@link IdentityHashMap}, {@link WeakHashMap}, and {@link ConcurrentHashMap}.
122+
* <p>Rejects known unordered collection types: {@link HashSet} (but not {@link LinkedHashSet}, which has stable
123+
* insertion-order iteration), and views returned by {@link HashMap}, {@link IdentityHashMap}, {@link WeakHashMap},
124+
* and {@link ConcurrentHashMap}.
123125
*
124126
* @param items the collection to validate
125127
* @throws IllegalArgumentException if items is null or has non-deterministic iteration order
@@ -128,6 +130,10 @@ public static void validateOrderedCollection(Collection<?> items) {
128130
if (items == null) {
129131
throw new IllegalArgumentException("items cannot be null");
130132
}
133+
// LinkedHashSet extends HashSet but has stable insertion-order iteration — allow it
134+
if (items instanceof LinkedHashSet) {
135+
return;
136+
}
131137
if (items instanceof HashSet || isUnorderedMapView(items)) {
132138
throw new IllegalArgumentException("items must have deterministic iteration order");
133139
}

sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,48 @@ void toleratedFailurePercentage_setsPercentage() {
6161
assertNull(config.toleratedFailureCount());
6262
assertEquals(0.25, config.toleratedFailurePercentage());
6363
}
64+
65+
@Test
66+
void minSuccessful_withZero_shouldThrow() {
67+
var exception = assertThrows(IllegalArgumentException.class, () -> CompletionConfig.minSuccessful(0));
68+
assertEquals("minSuccessful must be at least 1, got: 0", exception.getMessage());
69+
}
70+
71+
@Test
72+
void minSuccessful_withNegative_shouldThrow() {
73+
var exception = assertThrows(IllegalArgumentException.class, () -> CompletionConfig.minSuccessful(-1));
74+
assertEquals("minSuccessful must be at least 1, got: -1", exception.getMessage());
75+
}
76+
77+
@Test
78+
void toleratedFailureCount_withNegative_shouldThrow() {
79+
var exception = assertThrows(IllegalArgumentException.class, () -> CompletionConfig.toleratedFailureCount(-1));
80+
assertEquals("toleratedFailureCount must be non-negative, got: -1", exception.getMessage());
81+
}
82+
83+
@Test
84+
void toleratedFailurePercentage_withNegative_shouldThrow() {
85+
var exception =
86+
assertThrows(IllegalArgumentException.class, () -> CompletionConfig.toleratedFailurePercentage(-0.1));
87+
assertEquals("toleratedFailurePercentage must be between 0.0 and 1.0, got: -0.1", exception.getMessage());
88+
}
89+
90+
@Test
91+
void toleratedFailurePercentage_aboveOne_shouldThrow() {
92+
var exception =
93+
assertThrows(IllegalArgumentException.class, () -> CompletionConfig.toleratedFailurePercentage(1.5));
94+
assertEquals("toleratedFailurePercentage must be between 0.0 and 1.0, got: 1.5", exception.getMessage());
95+
}
96+
97+
@Test
98+
void toleratedFailurePercentage_atBoundaries_shouldPass() {
99+
assertDoesNotThrow(() -> CompletionConfig.toleratedFailurePercentage(0.0));
100+
assertDoesNotThrow(() -> CompletionConfig.toleratedFailurePercentage(1.0));
101+
}
102+
103+
@Test
104+
void toleratedFailureCount_withZero_shouldPass() {
105+
var config = CompletionConfig.toleratedFailureCount(0);
106+
assertEquals(0, config.toleratedFailureCount());
107+
}
64108
}

sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,26 @@ void toBuilder_canOverrideValues() {
101101
assertEquals(10, modified.maxConcurrency());
102102
assertEquals(4, original.maxConcurrency());
103103
}
104+
105+
@Test
106+
void builderWithZeroMaxConcurrency_shouldThrow() {
107+
var exception = assertThrows(
108+
IllegalArgumentException.class,
109+
() -> MapConfig.builder().maxConcurrency(0).build());
110+
assertEquals("maxConcurrency must be at least 1, got: 0", exception.getMessage());
111+
}
112+
113+
@Test
114+
void builderWithNegativeMaxConcurrency_shouldThrow() {
115+
var exception = assertThrows(
116+
IllegalArgumentException.class,
117+
() -> MapConfig.builder().maxConcurrency(-1).build());
118+
assertEquals("maxConcurrency must be at least 1, got: -1", exception.getMessage());
119+
}
120+
121+
@Test
122+
void builderWithNullMaxConcurrency_shouldPass() {
123+
var config = MapConfig.builder().maxConcurrency(null).build();
124+
assertNull(config.maxConcurrency());
125+
}
104126
}

sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -331,11 +331,9 @@ void validateOrderedCollection_withOrderedSet_shouldPass() {
331331
}
332332

333333
@Test
334-
void validateOrderedCollection_withLinkedHashSet_shouldThrow() {
335-
// LinkedHashSet extends HashSet, so it's rejected even though it has deterministic order
336-
assertThrows(
337-
IllegalArgumentException.class,
338-
() -> ParameterValidator.validateOrderedCollection(new LinkedHashSet<>(List.of("a", "b"))));
334+
void validateOrderedCollection_withLinkedHashSet_shouldPass() {
335+
// LinkedHashSet extends HashSet but has stable insertion-order iteration — allowed
336+
assertDoesNotThrow(() -> ParameterValidator.validateOrderedCollection(new LinkedHashSet<>(List.of("a", "b"))));
339337
}
340338

341339
@Test

0 commit comments

Comments
 (0)