Conversation
- Implement ConcurrencyOperation - Implement ParallelOperation, ParallelContext
| * prefixed with the parent hashed contextId (e.g. "<hash>-1", "<hash>-2" inside parent context <hash>). This | ||
| * matches the Python SDK's stepPrefix convention and prevents ID collisions in checkpoint batches. | ||
| */ | ||
| private String nextOperationId() { |
There was a problem hiding this comment.
This change will not be required with this PR: #206
| <groupId>org.slf4j</groupId> | ||
| <artifactId>slf4j-simple</artifactId> | ||
| <version>${slf4j.version}</version> | ||
| <scope>test</scope> |
There was a problem hiding this comment.
why is this needed for test?
There was a problem hiding this comment.
It's for local debugging
There was a problem hiding this comment.
Let's not add additional dependencies, especially for sdk module, unless it's very necessary.
| TypeToken<T> resultTypeToken, | ||
| SerDes resultSerDes, | ||
| DurableContext durableContext) { | ||
| this(operationIdentifier, function, resultTypeToken, resultSerDes, durableContext, null); |
There was a problem hiding this comment.
adding a callback to the completableFuture like in Map seems a cleaner solution than this.
There was a problem hiding this comment.
Agree, that's on my following up refactoring list
| return; | ||
| } | ||
| joined = true; | ||
| parallelOperation.get(); |
There was a problem hiding this comment.
Can we return a state of all branches here, like the map operation returns List<MapResultItem>. For Parallel, we can return something similar, just without the result of branches.
| import software.amazon.lambda.durable.operation.ParallelOperation; | ||
|
|
||
| /** User-facing context for managing parallel branch execution within a durable function. */ | ||
| public class ParallelContext implements AutoCloseable { |
There was a problem hiding this comment.
Let's make this a subclass of DurableFuture, so that this can be used with the other utilities such as allOf
| @Override | ||
| protected void handleFailure(ConcurrencyCompletionStatus concurrencyCompletionStatus) { | ||
| sendOperationUpdate(OperationUpdate.builder() | ||
| .action(OperationAction.FAIL) |
There was a problem hiding this comment.
I think parallel operation always succeed no matter the children succeed or fail
| case MAP -> throw new ChildContextFailedException(op); | ||
| case MAP_ITERATION -> throw new ChildContextFailedException(op); | ||
| case PARALLEL -> throw new ChildContextFailedException(op); | ||
| case PARALLEL_BRANCH -> throw new ChildContextFailedException(op); |
There was a problem hiding this comment.
We should throw parallel specific exceptions here.
| public enum ConcurrencyCompletionStatus { | ||
| ALL_COMPLETED, | ||
| MIN_SUCCESSFUL_REACHED, | ||
| MIN_SUCCESSFUL_NOT_REACHED, |
There was a problem hiding this comment.
What's the purpose of this value MIN_SUCCESSFUL_NOT_REACHED
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
Issue Link, if available
Description
Demo/Screenshots
Note that the hierarchy is not correct, will be fixed in the follow up PRs
Checklist
Testing
Unit Tests
Have unit tests been written for these changes? Yes
Integration Tests
Have integration tests been written for these changes? Not yet
Examples
Has a new example been added for the change? (if applicable) Yes