diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExample.java new file mode 100644 index 000000000..a25464b48 --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExample.java @@ -0,0 +1,69 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.parallel; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableFuture; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.ParallelConfig; + +/** + * Example demonstrating parallel branches where some branches include wait operations. + * + *

This models a notification fan-out pattern where different channels have different delivery delays: + * + *

+ * + *

All three branches run concurrently. Branches with waits suspend without consuming compute resources and resume + * automatically once the wait elapses. The parallel operation completes once all branches finish. + */ +public class ParallelWithWaitExample + extends DurableHandler { + + public record Input(String userId, String message) {} + + public record Output(List deliveries) {} + + @Override + public Output handleRequest(Input input, DurableContext context) { + var logger = context.getLogger(); + logger.info("Sending notifications to user {}", input.userId()); + + var config = ParallelConfig.builder().build(); + var futures = new ArrayList>(3); + + try (var parallel = context.parallel("notify", config)) { + + // Branch 1: email — no wait, deliver immediately + futures.add(parallel.branch("email", String.class, ctx -> { + ctx.wait("email-rate-limit-delay", Duration.ofSeconds(10)); + return ctx.step("send-email", String.class, stepCtx -> "email:" + input.message()); + })); + + // Branch 2: SMS — wait for rate-limit window, then send + futures.add(parallel.branch("sms", String.class, ctx -> { + ctx.wait("sms-rate-limit-delay", Duration.ofSeconds(10)); + return ctx.step("send-sms", String.class, stepCtx -> "sms:" + input.message()); + })); + + // Branch 3: push notification — wait for quiet-hours window, then send + futures.add(parallel.branch("push", String.class, ctx -> { + ctx.wait("push-quiet-delay", Duration.ofSeconds(10)); + return ctx.step("send-push", String.class, stepCtx -> "push:" + input.message()); + })); + } + + var deliveries = futures.stream().map(DurableFuture::get).toList(); + logger.info("All {} notifications delivered", deliveries.size()); + // Test replay + context.wait("wait for finalization", Duration.ofSeconds(5)); + return new Output(deliveries); + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExampleTest.java new file mode 100644 index 000000000..4352ed23f --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExampleTest.java @@ -0,0 +1,33 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.parallel; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.List; +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class ParallelWithWaitExampleTest { + @Test + void completesAfterManuallyAdvancingWaits() { + var handler = new ParallelWithWaitExample(); + var runner = LocalDurableTestRunner.create(ParallelWithWaitExample.Input.class, handler); + + var input = new ParallelWithWaitExample.Input("user-456", "world"); + + // First run suspends on wait branches + var first = runner.run(input); + assertEquals(ExecutionStatus.PENDING, first.getStatus()); + + // Advance waits and re-run to completion + runner.advanceTime(); + var result = runner.runUntilComplete(input); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var output = result.getResult(ParallelWithWaitExample.Output.class); + assertEquals(List.of("email:world", "sms:world", "push:world"), output.deliveries()); + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java index 89902f6a4..67fe56834 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/BaseContextImpl.java @@ -37,6 +37,28 @@ protected BaseContextImpl( String contextId, String contextName, ThreadType threadType) { + this(executionManager, durableConfig, lambdaContext, contextId, contextName, threadType, true); + } + + /** + * Creates a new BaseContext instance. + * + * @param executionManager the execution manager for thread coordination and state management + * @param durableConfig the durable execution configuration + * @param lambdaContext the AWS Lambda runtime context + * @param contextId the context ID, null for root context, set for child contexts + * @param contextName the human-readable name for this context + * @param threadType the type of thread this context runs on + * @param setCurrentThreadContext whether to call setCurrentThreadContext on the execution manager + */ + protected BaseContextImpl( + ExecutionManager executionManager, + DurableConfig durableConfig, + Context lambdaContext, + String contextId, + String contextName, + ThreadType threadType, + boolean setCurrentThreadContext) { this.executionManager = executionManager; this.durableConfig = durableConfig; this.lambdaContext = lambdaContext; @@ -45,8 +67,10 @@ protected BaseContextImpl( this.isReplaying = executionManager.hasOperationsForContext(contextId); this.threadType = threadType; - // write the thread id and type to thread local - executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType)); + if (setCurrentThreadContext) { + // write the thread id and type to thread local + executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType)); + } } // =============== accessors ================ diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java index ff2bfbb63..8ffb94e84 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java @@ -67,7 +67,24 @@ private DurableContextImpl( Context lambdaContext, String contextId, String contextName) { - super(executionManager, durableConfig, lambdaContext, contextId, contextName, ThreadType.CONTEXT); + this(executionManager, durableConfig, lambdaContext, contextId, contextName, true); + } + + private DurableContextImpl( + ExecutionManager executionManager, + DurableConfig durableConfig, + Context lambdaContext, + String contextId, + String contextName, + boolean setCurrentThreadContext) { + super( + executionManager, + durableConfig, + lambdaContext, + contextId, + contextName, + ThreadType.CONTEXT, + setCurrentThreadContext); operationIdGenerator = new OperationIdGenerator(contextId); } @@ -98,6 +115,22 @@ public DurableContextImpl createChildContext(String childContextId, String child getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName); } + /** + * Creates a child context without setting the current thread context. + * + *

Use this when the child context is being created on a thread that should not have its thread-local context + * overwritten (e.g. when constructing the context ahead of running it on a separate thread). + * + * @param childContextId the child context's ID (the CONTEXT operation's operation ID) + * @param childContextName the name of the child context + * @return a new DurableContext for the child context + */ + public DurableContextImpl createChildContextWithoutSettingThreadContext( + String childContextId, String childContextName) { + return new DurableContextImpl( + getExecutionManager(), getDurableConfig(), getLambdaContext(), childContextId, childContextName, false); + } + /** * Creates a step context for executing step operations. * diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java index 6071f6d14..b01274500 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java @@ -74,7 +74,7 @@ protected ConcurrencyOperation( this.toleratedFailureCount = toleratedFailureCount; this.failureRateThreshold = failureRateThreshold; this.operationIdGenerator = new OperationIdGenerator(getOperationId()); - this.rootContext = durableContext.createChildContext(getOperationId(), getName()); + this.rootContext = durableContext.createChildContextWithoutSettingThreadContext(getOperationId(), getName()); } protected ConcurrencyOperation( diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java index 55d4395e5..439c3183d 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java @@ -3,6 +3,7 @@ package software.amazon.lambda.durable.operation; import java.util.function.Function; +import software.amazon.awssdk.services.lambda.model.ContextOptions; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationAction; import software.amazon.awssdk.services.lambda.model.OperationType; @@ -41,6 +42,8 @@ */ public class ParallelOperation extends ConcurrencyOperation { + private boolean replaying = false; + public ParallelOperation( OperationIdentifier operationIdentifier, TypeToken resultTypeToken, @@ -78,9 +81,14 @@ protected ChildContextOperation createItem( @Override protected void handleSuccess() { + if (replaying) { + // Do not send checkpoint during replay + return; + } sendOperationUpdate(OperationUpdate.builder() .action(OperationAction.SUCCEED) - .subType(getSubType().getValue())); + .subType(getSubType().getValue()) + .contextOptions(ContextOptions.builder().replayChildren(true).build())); } @Override @@ -97,8 +105,9 @@ protected void start() { @Override protected void replay(Operation existing) { - // Always replay child branches for parallel - start(); + // No-op: child branches handle their own replay via ChildContextOperation.replay(). + // Set replaying=true so handleSuccess() skips re-checkpointing the already-completed parallel context. + replaying = true; } @Override diff --git a/sdk/src/test/java/software/amazon/lambda/durable/context/BaseContextImplTest.java b/sdk/src/test/java/software/amazon/lambda/durable/context/BaseContextImplTest.java new file mode 100644 index 000000000..c0d321ed5 --- /dev/null +++ b/sdk/src/test/java/software/amazon/lambda/durable/context/BaseContextImplTest.java @@ -0,0 +1,123 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.context; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState; +import software.amazon.awssdk.services.lambda.model.Operation; +import software.amazon.awssdk.services.lambda.model.OperationStatus; +import software.amazon.awssdk.services.lambda.model.OperationType; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.TestUtils; +import software.amazon.lambda.durable.execution.ExecutionManager; +import software.amazon.lambda.durable.execution.ThreadContext; +import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.model.DurableExecutionInput; + +class BaseContextImplTest { + + private static final String INVOCATION_ID = "20dae574-53da-37a1-bfd5-b0e2e6ec715d"; + private static final String EXECUTION_NAME = "349beff4-a89d-4bc8-a56f-af7a8af67a5f"; + private static final Operation EXECUTION_OP = Operation.builder() + .id(INVOCATION_ID) + .type(OperationType.EXECUTION) + .status(OperationStatus.STARTED) + .build(); + + @BeforeEach + void clearThreadContext() { + // currentThreadContext is a static ThreadLocal on ExecutionManager — clear it + // before each test to prevent bleed-through from other tests on the same thread. + createExecutionManager().setCurrentThreadContext(null); + } + + private ExecutionManager createExecutionManager() { + var client = TestUtils.createMockClient(); + var initialState = CheckpointUpdatedExecutionState.builder() + .operations(new ArrayList<>(List.of(EXECUTION_OP))) + .build(); + return new ExecutionManager( + new DurableExecutionInput( + "arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/" + + EXECUTION_NAME + "/" + INVOCATION_ID, + "test-token", + initialState), + DurableConfig.builder().withDurableExecutionClient(client).build()); + } + + @Test + void defaultConstructor_setsCurrentThreadContext() { + var executionManager = createExecutionManager(); + // Precondition: no thread context set yet + assertNull(executionManager.getCurrentThreadContext()); + + // Creating a root context with the default constructor should set the thread context + DurableContextImpl.createRootContext( + executionManager, DurableConfig.builder().build(), null); + + var threadContext = executionManager.getCurrentThreadContext(); + assertNotNull(threadContext); + assertEquals(ThreadType.CONTEXT, threadContext.threadType()); + assertNull(threadContext.threadId()); + } + + @Test + void constructorWithSetCurrentThreadContextTrue_setsCurrentThreadContext() { + var executionManager = createExecutionManager(); + + // createRootContext sets thread context to root (threadId=null) + var rootContext = DurableContextImpl.createRootContext( + executionManager, DurableConfig.builder().build(), null); + assertEquals( + ThreadType.CONTEXT, executionManager.getCurrentThreadContext().threadType()); + assertNull(executionManager.getCurrentThreadContext().threadId()); + + // createChildContext (setCurrentThreadContext=true) should overwrite with child's context + rootContext.createChildContext("child-id", "child-name"); + + var threadContext = executionManager.getCurrentThreadContext(); + assertNotNull(threadContext); + assertEquals(ThreadType.CONTEXT, threadContext.threadType()); + assertEquals("child-id", threadContext.threadId()); + } + + @Test + void constructorWithSetCurrentThreadContextFalse_doesNotOverwriteThreadContext() { + var executionManager = createExecutionManager(); + + // Create root context first (it will set thread context to null/root) + var rootContext = DurableContextImpl.createRootContext( + executionManager, DurableConfig.builder().build(), null); + + // Now set a sentinel — simulating a caller thread that already has context established + var sentinel = new ThreadContext("original-context", ThreadType.CONTEXT); + executionManager.setCurrentThreadContext(sentinel); + + // createChildContextWithoutSettingThreadContext should NOT overwrite the sentinel + rootContext.createChildContextWithoutSettingThreadContext("child-id", "child-name"); + + // Thread context should still be the sentinel, not the child's context + var threadContext = executionManager.getCurrentThreadContext(); + assertNotNull(threadContext); + assertEquals("original-context", threadContext.threadId()); + } + + @Test + void createChildContextWithoutSettingThreadContext_returnsValidChildContext() { + var executionManager = createExecutionManager(); + executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT)); + var rootContext = DurableContextImpl.createRootContext( + executionManager, DurableConfig.builder().build(), null); + + var childContext = rootContext.createChildContextWithoutSettingThreadContext("child-id", "child-name"); + + assertNotNull(childContext); + assertEquals("child-id", childContext.getContextId()); + assertEquals("child-name", childContext.getContextName()); + } +} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/context/DurableContextImplTest.java b/sdk/src/test/java/software/amazon/lambda/durable/context/DurableContextImplTest.java new file mode 100644 index 000000000..e3a5f732a --- /dev/null +++ b/sdk/src/test/java/software/amazon/lambda/durable/context/DurableContextImplTest.java @@ -0,0 +1,93 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.context; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState; +import software.amazon.awssdk.services.lambda.model.Operation; +import software.amazon.awssdk.services.lambda.model.OperationStatus; +import software.amazon.awssdk.services.lambda.model.OperationType; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.TestUtils; +import software.amazon.lambda.durable.execution.ExecutionManager; +import software.amazon.lambda.durable.execution.ThreadContext; +import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.model.DurableExecutionInput; + +class DurableContextImplTest { + + private static final String INVOCATION_ID = "20dae574-53da-37a1-bfd5-b0e2e6ec715d"; + private static final String EXECUTION_NAME = "349beff4-a89d-4bc8-a56f-af7a8af67a5f"; + + private ExecutionManager executionManager; + private DurableContextImpl rootContext; + + @BeforeEach + void setUp() { + var executionOp = Operation.builder() + .id(INVOCATION_ID) + .type(OperationType.EXECUTION) + .status(OperationStatus.STARTED) + .build(); + var client = TestUtils.createMockClient(); + var initialState = CheckpointUpdatedExecutionState.builder() + .operations(new ArrayList<>(List.of(executionOp))) + .build(); + executionManager = new ExecutionManager( + new DurableExecutionInput( + "arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/" + + EXECUTION_NAME + "/" + INVOCATION_ID, + "test-token", + initialState), + DurableConfig.builder().withDurableExecutionClient(client).build()); + // Simulate the root thread context as the executor would set it + executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT)); + rootContext = DurableContextImpl.createRootContext( + executionManager, DurableConfig.builder().build(), null); + } + + @Test + void createChildContext_setsThreadContextToChild() { + rootContext.createChildContext("child-1", "my-child"); + + var threadContext = executionManager.getCurrentThreadContext(); + assertNotNull(threadContext); + assertEquals("child-1", threadContext.threadId()); + assertEquals(ThreadType.CONTEXT, threadContext.threadType()); + } + + @Test + void createChildContextWithoutSettingThreadContext_preservesCallerThreadContext() { + var callerContext = new ThreadContext("caller-thread", ThreadType.CONTEXT); + executionManager.setCurrentThreadContext(callerContext); + + rootContext.createChildContextWithoutSettingThreadContext("child-1", "my-child"); + + // Thread context must remain unchanged + var threadContext = executionManager.getCurrentThreadContext(); + assertEquals("caller-thread", threadContext.threadId()); + } + + @Test + void createChildContextWithoutSettingThreadContext_returnsCorrectChildMetadata() { + var child = rootContext.createChildContextWithoutSettingThreadContext("child-42", "child-name"); + + assertEquals("child-42", child.getContextId()); + assertEquals("child-name", child.getContextName()); + } + + @Test + void createChildContextWithoutSettingThreadContext_whenNoThreadContextSet_leavesItNull() { + // Clear any existing thread context + executionManager.setCurrentThreadContext(null); + + rootContext.createChildContextWithoutSettingThreadContext("child-1", "my-child"); + + assertNull(executionManager.getCurrentThreadContext()); + } +} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java index a55d1c139..bf6179c26 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java @@ -63,6 +63,8 @@ void setUp() { .withExecutorService(Executors.newCachedThreadPool()) .build()); when(durableContext.createChildContext(anyString(), anyString())).thenReturn(childContext); + when(durableContext.createChildContextWithoutSettingThreadContext(anyString(), anyString())) + .thenReturn(childContext); when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("Root", ThreadType.CONTEXT)); mockIdGenerator = mock(OperationIdGenerator.class); when(mockIdGenerator.nextOperationId()).thenAnswer(inv -> "child-" + operationIdCounter.incrementAndGet()); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java index a9983accd..e537a54e1 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java @@ -60,6 +60,8 @@ void setUp() { .withExecutorService(Executors.newCachedThreadPool()) .build()); when(durableContext.createChildContext(anyString(), anyString())).thenReturn(childContext); + when(durableContext.createChildContextWithoutSettingThreadContext(anyString(), anyString())) + .thenReturn(childContext); when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("Root", ThreadType.CONTEXT)); // Default: no existing operations (fresh execution) mockIdGenerator = mock(OperationIdGenerator.class); @@ -197,6 +199,100 @@ void contextHierarchy_branchesUseParallelContextAsParent() throws Exception { assertNotNull(childOp); } + // ===== Replay ===== + + @Test + void replay_doesNotSendStartCheckpoint() throws Exception { + // Simulate the parallel operation already existing in the service (STARTED status) + when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)) + .thenReturn(Operation.builder() + .id(OPERATION_ID) + .name("test-parallel") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL.getValue()) + .status(OperationStatus.STARTED) + .build()); + // Both branches already succeeded + when(executionManager.getOperationAndUpdateReplayState("child-1")) + .thenReturn(Operation.builder() + .id("child-1") + .name("branch-1") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL_BRANCH.getValue()) + .status(OperationStatus.SUCCEEDED) + .contextDetails( + ContextDetails.builder().result("\"r1\"").build()) + .build()); + when(executionManager.getOperationAndUpdateReplayState("child-2")) + .thenReturn(Operation.builder() + .id("child-2") + .name("branch-2") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL_BRANCH.getValue()) + .status(OperationStatus.SUCCEEDED) + .contextDetails( + ContextDetails.builder().result("\"r2\"").build()) + .build()); + + var op = createOperation(-1, -1, 0); + setOperationIdGenerator(op, mockIdGenerator); + op.execute(); + op.addItem("branch-1", ctx -> "r1", TypeToken.get(String.class), SER_DES); + op.addItem("branch-2", ctx -> "r2", TypeToken.get(String.class), SER_DES); + + runJoin(op); + + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.START)); + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); + } + + @Test + void replay_doesNotSendSucceedCheckpointWhenParallelAlreadySucceeded() throws Exception { + when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)) + .thenReturn(Operation.builder() + .id(OPERATION_ID) + .name("test-parallel") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL.getValue()) + .status(OperationStatus.SUCCEEDED) + .build()); + when(executionManager.getOperationAndUpdateReplayState("child-1")) + .thenReturn(Operation.builder() + .id("child-1") + .name("branch-1") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL_BRANCH.getValue()) + .status(OperationStatus.SUCCEEDED) + .contextDetails( + ContextDetails.builder().result("\"r1\"").build()) + .build()); + when(executionManager.getOperationAndUpdateReplayState("child-2")) + .thenReturn(Operation.builder() + .id("child-2") + .name("branch-2") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL_BRANCH.getValue()) + .status(OperationStatus.SUCCEEDED) + .contextDetails( + ContextDetails.builder().result("\"r2\"").build()) + .build()); + + var op = createOperation(-1, -1, 0); + setOperationIdGenerator(op, mockIdGenerator); + op.execute(); + op.addItem("branch-1", ctx -> "r1", TypeToken.get(String.class), SER_DES); + op.addItem("branch-2", ctx -> "r2", TypeToken.get(String.class), SER_DES); + + runJoin(op); + + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.START)); + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); + } + // ===== handleFailure still sends SUCCEED ===== @Test