From d9b8254d3a1654ce2329cf5addecf8ec5597c6af Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Tue, 31 Mar 2026 16:25:25 -0700 Subject: [PATCH] Add complex map tests --- .../lambda/durable/MapIntegrationTest.java | 359 ++++++++++++++++++ 1 file changed, 359 insertions(+) diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java index 7788cc4a1..9b9fffff7 100644 --- a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java @@ -11,9 +11,12 @@ import org.junit.jupiter.api.Test; import software.amazon.lambda.durable.config.CompletionConfig; import software.amazon.lambda.durable.config.MapConfig; +import software.amazon.lambda.durable.config.WaitForConditionConfig; import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus; import software.amazon.lambda.durable.model.ExecutionStatus; import software.amazon.lambda.durable.model.MapResult; +import software.amazon.lambda.durable.model.WaitForConditionResult; +import software.amazon.lambda.durable.retry.WaitStrategies; import software.amazon.lambda.durable.testing.LocalDurableTestRunner; class MapIntegrationTest { @@ -890,6 +893,362 @@ void testMapWithNullResults() { assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); } + // ---- 50-item map tests with waitForCallback ---- + + @Test + void testMap50ItemsWithWaitForCallback() { + var itemCount = 50; + var items = new ArrayList(); + for (int i = 0; i < itemCount; i++) { + items.add(i); + } + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result = context.map("50-callbacks", items, String.class, (item, index, ctx) -> { + return ctx.waitForCallback("approval-" + index, String.class, (callbackId, stepCtx) -> {}); + }); + + assertTrue(result.allSucceeded()); + assertEquals(itemCount, result.size()); + return String.valueOf(result.succeeded().size()); + }); + + // First run — all items create callbacks and suspend + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete all 50 callbacks + for (int i = 0; i < itemCount; i++) { + var callbackId = runner.getCallbackId("approval-" + i + "-callback"); + assertNotNull(callbackId, "Callback ID should exist for approval-" + i); + runner.completeCallback(callbackId, "\"result-" + i + "\""); + } + + // Re-run — all callbacks resolved, execution completes + result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("50", result.getResult(String.class)); + } + + @Test + void testMap50ItemsWithWaitForCallback_maxConcurrency5() { + var itemCount = 50; + var items = new ArrayList(); + for (int i = 0; i < itemCount; i++) { + items.add(i); + } + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var config = MapConfig.builder().maxConcurrency(5).build(); + var result = context.map( + "50-callbacks-limited", + items, + String.class, + (item, index, ctx) -> { + return ctx.waitForCallback("cb-" + index, String.class, (callbackId, stepCtx) -> {}); + }, + config); + + assertTrue(result.allSucceeded()); + return String.valueOf(result.succeeded().size()); + }); + + // First run — suspends on callbacks + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete callbacks in batches, re-running between batches to let concurrency-limited items start + for (int batch = 0; batch < 10; batch++) { + var completed = false; + for (int i = batch * 5; i < (batch + 1) * 5; i++) { + var callbackId = runner.getCallbackId("cb-" + i + "-callback"); + if (callbackId != null) { + runner.completeCallback(callbackId, "\"ok-" + i + "\""); + completed = true; + } + } + if (completed) { + result = runner.run("test"); + if (result.getStatus() == ExecutionStatus.SUCCEEDED) break; + } + } + + result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("50", result.getResult(String.class)); + } + + @Test + void testMap50ItemsWithWaitForCallback_partialFailure() { + var itemCount = 50; + var items = new ArrayList(); + for (int i = 0; i < itemCount; i++) { + items.add(i); + } + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result = context.map("50-callbacks-partial-fail", items, String.class, (item, index, ctx) -> { + return ctx.waitForCallback("approval-" + index, String.class, (callbackId, stepCtx) -> {}); + }); + + assertEquals(itemCount, result.size()); + assertEquals(25, result.succeeded().size()); + assertEquals(25, result.failed().size()); + assertEquals(ConcurrencyCompletionStatus.ALL_COMPLETED, result.completionReason()); + + return result.succeeded().size() + "/" + result.failed().size(); + }); + + // First run — all items create callbacks and suspend + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete even-indexed callbacks, fail odd-indexed ones + for (int i = 0; i < itemCount; i++) { + var callbackId = runner.getCallbackId("approval-" + i + "-callback"); + assertNotNull(callbackId, "Callback ID should exist for approval-" + i); + if (i % 2 == 0) { + runner.completeCallback(callbackId, "\"ok-" + i + "\""); + } else { + runner.failCallback( + callbackId, + software.amazon.awssdk.services.lambda.model.ErrorObject.builder() + .errorType("Rejected") + .errorMessage("Item " + i + " rejected") + .build()); + } + } + + result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("25/25", result.getResult(String.class)); + } + + @Test + void testMap50ItemsWithWaitForCallback_stepsBeforeAndAfterCallback() { + var itemCount = 50; + var items = new ArrayList(); + for (int i = 0; i < itemCount; i++) { + items.add(i); + } + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result = context.map("50-callbacks-with-steps", items, String.class, (item, index, ctx) -> { + var before = ctx.step("prepare-" + index, String.class, stepCtx -> "prepared-" + index); + var approval = ctx.waitForCallback("approval-" + index, String.class, (callbackId, stepCtx) -> {}); + return ctx.step("finalize-" + index, String.class, stepCtx -> before + ":" + approval + ":done"); + }); + + assertTrue(result.allSucceeded()); + return String.valueOf(result.succeeded().size()); + }); + + // First run — items execute prepare step, create callbacks, suspend + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete all callbacks + for (int i = 0; i < itemCount; i++) { + var callbackId = runner.getCallbackId("approval-" + i + "-callback"); + assertNotNull(callbackId, "Callback ID should exist for approval-" + i); + runner.completeCallback(callbackId, "\"approved-" + i + "\""); + } + + // Re-run — callbacks resolved, finalize steps execute + result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("50", result.getResult(String.class)); + } + + // ---- 50-item map tests with waitForCondition ---- + + @Test + void testMap50ItemsWithWaitForCondition() { + var itemCount = 50; + var items = new ArrayList(); + for (int i = 0; i < itemCount; i++) { + items.add(i); + } + var checkCounts = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result = context.map("50-conditions", items, Integer.class, (item, index, ctx) -> { + var targetChecks = (index % 3) + 1; // 1, 2, or 3 checks to complete + var strategy = WaitStrategies.fixedDelay(10, Duration.ofSeconds(1)); + var wfcConfig = WaitForConditionConfig.builder() + .waitStrategy(strategy) + .build(); + + return ctx.waitForCondition( + "poll-" + index, + Integer.class, + (state, stepCtx) -> { + checkCounts.incrementAndGet(); + var next = (state == null ? 0 : state) + 1; + return next >= targetChecks + ? WaitForConditionResult.stopPolling(next) + : WaitForConditionResult.continuePolling(next); + }, + wfcConfig); + }); + + assertTrue(result.allSucceeded()); + assertEquals(itemCount, result.size()); + + var sum = 0; + for (int i = 0; i < result.size(); i++) { + sum += result.getResult(i); + } + return String.valueOf(sum); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + // Each item completes after (index%3)+1 checks: 17 items need 1, 17 need 2, 16 need 3 + // Sum of results: 17*1 + 17*2 + 16*3 = 17 + 34 + 48 = 99 + assertEquals("99", result.getResult(String.class)); + assertTrue(checkCounts.get() >= itemCount, "Should have at least " + itemCount + " checks"); + } + + @Test + void testMap50ItemsWithWaitForCondition_someExceedMaxAttempts() { + var itemCount = 50; + var items = new ArrayList(); + for (int i = 0; i < itemCount; i++) { + items.add(i); + } + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result = context.map("50-conditions-some-fail", items, Integer.class, (item, index, ctx) -> { + // Odd items: maxAttempts=1 but need 2 checks → will fail + // Even items: maxAttempts=5, need 2 checks → will succeed + var maxAttempts = (index % 2 == 0) ? 5 : 1; + var strategy = WaitStrategies.fixedDelay(maxAttempts, Duration.ofSeconds(1)); + var wfcConfig = WaitForConditionConfig.builder() + .waitStrategy(strategy) + .build(); + + return ctx.waitForCondition( + "poll-" + index, + Integer.class, + (state, stepCtx) -> { + var next = (state == null ? 0 : state) + 1; + return next >= 2 + ? WaitForConditionResult.stopPolling(next) + : WaitForConditionResult.continuePolling(next); + }, + wfcConfig); + }); + + assertEquals(itemCount, result.size()); + assertEquals(25, result.succeeded().size()); + assertEquals(25, result.failed().size()); + assertEquals(ConcurrencyCompletionStatus.ALL_COMPLETED, result.completionReason()); + + return result.succeeded().size() + "/" + result.failed().size(); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("25/25", result.getResult(String.class)); + } + + @Test + void testMap50ItemsWithWaitForCondition_replay() { + var itemCount = 50; + var items = new ArrayList(); + for (int i = 0; i < itemCount; i++) { + items.add(i); + } + var checkCounts = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result = context.map("50-conditions-replay", items, String.class, (item, index, ctx) -> { + var strategy = WaitStrategies.fixedDelay(5, Duration.ofSeconds(1)); + var wfcConfig = WaitForConditionConfig.builder() + .waitStrategy(strategy) + .build(); + + var polled = ctx.waitForCondition( + "poll-" + index, + Integer.class, + (state, stepCtx) -> { + checkCounts.incrementAndGet(); + return WaitForConditionResult.stopPolling(1); + }, + wfcConfig); + + return String.valueOf(polled); + }); + + assertTrue(result.allSucceeded()); + return "done"; + }); + + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + var firstRunChecks = checkCounts.get(); + assertEquals(itemCount, firstRunChecks); + + // Replay — check functions should not re-execute + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals(firstRunChecks, checkCounts.get(), "Check functions should not re-execute on replay"); + } + + // ---- 50-item map tests mixing waitForCallback and waitForCondition ---- + + @Test + void testMap50ItemsMixed_callbackAndCondition() { + var itemCount = 50; + var items = new ArrayList(); + for (int i = 0; i < itemCount; i++) { + items.add(i); + } + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result = context.map("50-mixed", items, String.class, (item, index, ctx) -> { + if (index % 2 == 0) { + // Even items: waitForCallback + return ctx.waitForCallback("cb-" + index, String.class, (callbackId, stepCtx) -> {}); + } else { + // Odd items: waitForCondition + var strategy = WaitStrategies.fixedDelay(5, Duration.ofSeconds(1)); + var wfcConfig = WaitForConditionConfig.builder() + .waitStrategy(strategy) + .build(); + + var polled = ctx.waitForCondition( + "poll-" + index, + Integer.class, + (state, stepCtx) -> WaitForConditionResult.stopPolling(index), + wfcConfig); + + return "polled-" + polled; + } + }); + + assertEquals(itemCount, result.size()); + return String.valueOf(result.succeeded().size()); + }); + + // First run — callback items suspend, condition items may complete + var result = runner.run("test"); + + // Complete all callback items (even-indexed) + for (int i = 0; i < itemCount; i += 2) { + var callbackId = runner.getCallbackId("cb-" + i + "-callback"); + if (callbackId != null) { + runner.completeCallback(callbackId, "\"callback-" + i + "\""); + } + } + + result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("50", result.getResult(String.class)); + } + @Test void testMultipleMapAsyncInParallel() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {