Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.config.CompletionConfig;
import software.amazon.lambda.durable.config.MapConfig;
import software.amazon.lambda.durable.config.WaitForConditionConfig;
import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.model.MapResult;
import software.amazon.lambda.durable.model.WaitForConditionResult;
import software.amazon.lambda.durable.retry.WaitStrategies;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

class MapIntegrationTest {
Expand Down Expand Up @@ -890,6 +893,362 @@ void testMapWithNullResults() {
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
}

// ---- 50-item map tests with waitForCallback ----

@Test
void testMap50ItemsWithWaitForCallback() {
var itemCount = 50;
var items = new ArrayList<Integer>();
for (int i = 0; i < itemCount; i++) {
items.add(i);
}

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var result = context.map("50-callbacks", items, String.class, (item, index, ctx) -> {
return ctx.waitForCallback("approval-" + index, String.class, (callbackId, stepCtx) -> {});
});

assertTrue(result.allSucceeded());
assertEquals(itemCount, result.size());
return String.valueOf(result.succeeded().size());
});

// First run — all items create callbacks and suspend
var result = runner.run("test");
assertEquals(ExecutionStatus.PENDING, result.getStatus());

// Complete all 50 callbacks
for (int i = 0; i < itemCount; i++) {
var callbackId = runner.getCallbackId("approval-" + i + "-callback");
assertNotNull(callbackId, "Callback ID should exist for approval-" + i);
runner.completeCallback(callbackId, "\"result-" + i + "\"");
}

// Re-run — all callbacks resolved, execution completes
result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("50", result.getResult(String.class));
}

@Test
void testMap50ItemsWithWaitForCallback_maxConcurrency5() {
var itemCount = 50;
var items = new ArrayList<Integer>();
for (int i = 0; i < itemCount; i++) {
items.add(i);
}

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var config = MapConfig.builder().maxConcurrency(5).build();
var result = context.map(
"50-callbacks-limited",
items,
String.class,
(item, index, ctx) -> {
return ctx.waitForCallback("cb-" + index, String.class, (callbackId, stepCtx) -> {});
},
config);

assertTrue(result.allSucceeded());
return String.valueOf(result.succeeded().size());
});

// First run — suspends on callbacks
var result = runner.run("test");
assertEquals(ExecutionStatus.PENDING, result.getStatus());

// Complete callbacks in batches, re-running between batches to let concurrency-limited items start
for (int batch = 0; batch < 10; batch++) {
var completed = false;
for (int i = batch * 5; i < (batch + 1) * 5; i++) {
var callbackId = runner.getCallbackId("cb-" + i + "-callback");
if (callbackId != null) {
runner.completeCallback(callbackId, "\"ok-" + i + "\"");
completed = true;
}
}
if (completed) {
result = runner.run("test");
if (result.getStatus() == ExecutionStatus.SUCCEEDED) break;
}
}

result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("50", result.getResult(String.class));
}

@Test
void testMap50ItemsWithWaitForCallback_partialFailure() {
var itemCount = 50;
var items = new ArrayList<Integer>();
for (int i = 0; i < itemCount; i++) {
items.add(i);
}

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var result = context.map("50-callbacks-partial-fail", items, String.class, (item, index, ctx) -> {
return ctx.waitForCallback("approval-" + index, String.class, (callbackId, stepCtx) -> {});
});

assertEquals(itemCount, result.size());
assertEquals(25, result.succeeded().size());
assertEquals(25, result.failed().size());
assertEquals(ConcurrencyCompletionStatus.ALL_COMPLETED, result.completionReason());

return result.succeeded().size() + "/" + result.failed().size();
});

// First run — all items create callbacks and suspend
var result = runner.run("test");
assertEquals(ExecutionStatus.PENDING, result.getStatus());

// Complete even-indexed callbacks, fail odd-indexed ones
for (int i = 0; i < itemCount; i++) {
var callbackId = runner.getCallbackId("approval-" + i + "-callback");
assertNotNull(callbackId, "Callback ID should exist for approval-" + i);
if (i % 2 == 0) {
runner.completeCallback(callbackId, "\"ok-" + i + "\"");
} else {
runner.failCallback(
callbackId,
software.amazon.awssdk.services.lambda.model.ErrorObject.builder()
.errorType("Rejected")
.errorMessage("Item " + i + " rejected")
.build());
}
}

result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("25/25", result.getResult(String.class));
}

@Test
void testMap50ItemsWithWaitForCallback_stepsBeforeAndAfterCallback() {
var itemCount = 50;
var items = new ArrayList<Integer>();
for (int i = 0; i < itemCount; i++) {
items.add(i);
}

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var result = context.map("50-callbacks-with-steps", items, String.class, (item, index, ctx) -> {
var before = ctx.step("prepare-" + index, String.class, stepCtx -> "prepared-" + index);
var approval = ctx.waitForCallback("approval-" + index, String.class, (callbackId, stepCtx) -> {});
return ctx.step("finalize-" + index, String.class, stepCtx -> before + ":" + approval + ":done");
});

assertTrue(result.allSucceeded());
return String.valueOf(result.succeeded().size());
});

// First run — items execute prepare step, create callbacks, suspend
var result = runner.run("test");
assertEquals(ExecutionStatus.PENDING, result.getStatus());

// Complete all callbacks
for (int i = 0; i < itemCount; i++) {
var callbackId = runner.getCallbackId("approval-" + i + "-callback");
assertNotNull(callbackId, "Callback ID should exist for approval-" + i);
runner.completeCallback(callbackId, "\"approved-" + i + "\"");
}

// Re-run — callbacks resolved, finalize steps execute
result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("50", result.getResult(String.class));
}

// ---- 50-item map tests with waitForCondition ----

@Test
void testMap50ItemsWithWaitForCondition() {
var itemCount = 50;
var items = new ArrayList<Integer>();
for (int i = 0; i < itemCount; i++) {
items.add(i);
}
var checkCounts = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var result = context.map("50-conditions", items, Integer.class, (item, index, ctx) -> {
var targetChecks = (index % 3) + 1; // 1, 2, or 3 checks to complete
var strategy = WaitStrategies.<Integer>fixedDelay(10, Duration.ofSeconds(1));
var wfcConfig = WaitForConditionConfig.<Integer>builder()
.waitStrategy(strategy)
.build();

return ctx.waitForCondition(
"poll-" + index,
Integer.class,
(state, stepCtx) -> {
checkCounts.incrementAndGet();
var next = (state == null ? 0 : state) + 1;
return next >= targetChecks
? WaitForConditionResult.stopPolling(next)
: WaitForConditionResult.continuePolling(next);
},
wfcConfig);
});

assertTrue(result.allSucceeded());
assertEquals(itemCount, result.size());

var sum = 0;
for (int i = 0; i < result.size(); i++) {
sum += result.getResult(i);
}
return String.valueOf(sum);
});

var result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

// Each item completes after (index%3)+1 checks: 17 items need 1, 17 need 2, 16 need 3
// Sum of results: 17*1 + 17*2 + 16*3 = 17 + 34 + 48 = 99
assertEquals("99", result.getResult(String.class));
assertTrue(checkCounts.get() >= itemCount, "Should have at least " + itemCount + " checks");
}

@Test
void testMap50ItemsWithWaitForCondition_someExceedMaxAttempts() {
var itemCount = 50;
var items = new ArrayList<Integer>();
for (int i = 0; i < itemCount; i++) {
items.add(i);
}

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var result = context.map("50-conditions-some-fail", items, Integer.class, (item, index, ctx) -> {
// Odd items: maxAttempts=1 but need 2 checks → will fail
// Even items: maxAttempts=5, need 2 checks → will succeed
var maxAttempts = (index % 2 == 0) ? 5 : 1;
var strategy = WaitStrategies.<Integer>fixedDelay(maxAttempts, Duration.ofSeconds(1));
var wfcConfig = WaitForConditionConfig.<Integer>builder()
.waitStrategy(strategy)
.build();

return ctx.waitForCondition(
"poll-" + index,
Integer.class,
(state, stepCtx) -> {
var next = (state == null ? 0 : state) + 1;
return next >= 2
? WaitForConditionResult.stopPolling(next)
: WaitForConditionResult.continuePolling(next);
},
wfcConfig);
});

assertEquals(itemCount, result.size());
assertEquals(25, result.succeeded().size());
assertEquals(25, result.failed().size());
assertEquals(ConcurrencyCompletionStatus.ALL_COMPLETED, result.completionReason());

return result.succeeded().size() + "/" + result.failed().size();
});

var result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("25/25", result.getResult(String.class));
}

@Test
void testMap50ItemsWithWaitForCondition_replay() {
var itemCount = 50;
var items = new ArrayList<Integer>();
for (int i = 0; i < itemCount; i++) {
items.add(i);
}
var checkCounts = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var result = context.map("50-conditions-replay", items, String.class, (item, index, ctx) -> {
var strategy = WaitStrategies.<Integer>fixedDelay(5, Duration.ofSeconds(1));
var wfcConfig = WaitForConditionConfig.<Integer>builder()
.waitStrategy(strategy)
.build();

var polled = ctx.waitForCondition(
"poll-" + index,
Integer.class,
(state, stepCtx) -> {
checkCounts.incrementAndGet();
return WaitForConditionResult.stopPolling(1);
},
wfcConfig);

return String.valueOf(polled);
});

assertTrue(result.allSucceeded());
return "done";
});

var result1 = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
var firstRunChecks = checkCounts.get();
assertEquals(itemCount, firstRunChecks);

// Replay — check functions should not re-execute
var result2 = runner.run("test");
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
assertEquals(firstRunChecks, checkCounts.get(), "Check functions should not re-execute on replay");
}

// ---- 50-item map tests mixing waitForCallback and waitForCondition ----

@Test
void testMap50ItemsMixed_callbackAndCondition() {
var itemCount = 50;
var items = new ArrayList<Integer>();
for (int i = 0; i < itemCount; i++) {
items.add(i);
}

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var result = context.map("50-mixed", items, String.class, (item, index, ctx) -> {
if (index % 2 == 0) {
// Even items: waitForCallback
return ctx.waitForCallback("cb-" + index, String.class, (callbackId, stepCtx) -> {});
} else {
// Odd items: waitForCondition
var strategy = WaitStrategies.<Integer>fixedDelay(5, Duration.ofSeconds(1));
var wfcConfig = WaitForConditionConfig.<Integer>builder()
.waitStrategy(strategy)
.build();

var polled = ctx.waitForCondition(
"poll-" + index,
Integer.class,
(state, stepCtx) -> WaitForConditionResult.stopPolling(index),
wfcConfig);

return "polled-" + polled;
}
});

assertEquals(itemCount, result.size());
return String.valueOf(result.succeeded().size());
});

// First run — callback items suspend, condition items may complete
var result = runner.run("test");

// Complete all callback items (even-indexed)
for (int i = 0; i < itemCount; i += 2) {
var callbackId = runner.getCallbackId("cb-" + i + "-callback");
if (callbackId != null) {
runner.completeCallback(callbackId, "\"callback-" + i + "\"");
}
}

result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals("50", result.getResult(String.class));
}

@Test
void testMultipleMapAsyncInParallel() {
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
Expand Down
Loading