Skip to content

feat(map): Fix map replay, wait-inside-map, and concurrency race conditions#229

Merged
ayushiahjolia merged 7 commits intomainfrom
map_bug_fixes
Mar 20, 2026
Merged

feat(map): Fix map replay, wait-inside-map, and concurrency race conditions#229
ayushiahjolia merged 7 commits intomainfrom
map_bug_fixes

Conversation

@ayushiahjolia
Copy link
Contributor

@ayushiahjolia ayushiahjolia commented Mar 18, 2026

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

Issue Link, if available

#39

Description

  • Updated Map implementation to rely on ConcurrencyOperation.
  • Added tests for all map usages.

Follow-ups/Remaining Work

  • If function returns null output, are we allowing checkpointing that?
  • Is it okay to enqueue all items? or we want to start executing item as soon as possible.
  • Doc updates in next CR, will remove dead code from Test as well.
  • Thread issues during replay in LocalDurableTestRunner.
  • If minSuccessful is reached then remaining iterations should return NOT_STARTED or SUCCESSFUL?

Demo/Screenshots

Screenshot 2026-03-19 at 4 36 00 PM

Checklist

  • I have filled out every section of the PR template
  • I have thoroughly tested this change

Testing

Unit Tests

Have unit tests been written for these changes? Updated.

Integration Tests

Have integration tests been written for these changes? Yes

Examples

Has a new example been added for the change? (if applicable) Yes

@ayushiahjolia ayushiahjolia marked this pull request as ready for review March 18, 2026 22:08
@ayushiahjolia ayushiahjolia requested a review from a team March 18, 2026 22:08
@zhongkechen
Copy link
Contributor

zhongkechen commented Mar 18, 2026

unit tests failed:

[ERROR] Errors: 
[ERROR]   ParallelOperationTest.branchCreation_multipleBranchesAllCreated:106 » IllegalState Cannot add items to a completed operation

This is due to the race condition introduced in the following change:

-        // Block until operation completes. No-op if the future is already completed.
-        completionFuture.join();
+        // Block until operation completes or execution suspends.
+        // Using runUntilCompleteOrSuspend races completionFuture against executionExceptionFuture,
+        // so when all active threads suspend (e.g., wait inside map branches), the
+        // SuspendExecutionException propagates and this thread is freed — preventing thread leaks
+        // on shared executor pools across invocations.
+        executionManager.runUntilCompleteOrSuspend(completionFuture).join();

Removing this change should fix the tests

@ayushiahjolia
Copy link
Contributor Author

ayushiahjolia commented Mar 19, 2026

When all map branches call wait(), every branch thread deregisters and suspendExecution() fires. But the parent thread sitting on completionFuture.join() in BaseConcurrentOperation.get() is stuck forever - no branch will ever complete to trigger onChildContextComplete and finalize the map's completionFuture. runUntilCompleteOrSuspend races completionFuture against executionExceptionFuture, so when suspension fires, the parent thread gets freed via SuspendExecutionException instead of blocking indefinitely.

@zhongkechen zhongkechen self-requested a review March 19, 2026 23:40

private void addAllItems() {
// Enqueue all items first, then start execution. This prevents early termination
// criteria (e.g., minSuccessful) from completing the operation mid-loop on replay,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering the expected behavior of early termination for map, do we skip the rest of the items?

ChildOperation will not send checkpoint if parent operation is completed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we skip remaining items with NOT_STARTED status.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if there were some items already started, they will complete.

* after all items have been enqueued. This prevents early termination from blocking item creation when all items
* are known upfront (e.g., map operations).
*/
protected <R> ChildContextOperation<R> enqueueItem(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling enqueueItem for all items, followed by startPendingItems, seems no difference with calling addItem for all items. Just for calculation of failure percentile?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calculation of failure numbers can also be calculated for eager start, because when user create Map they already pass the items as a list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addItem calls executeNextItemIfAllowed() immediately, and on replay child execution is synchronous - so early termination (e.g., minSuccessful=1) can mark the operation completed before the remaining items are even registered. The next addItem would then throw IllegalStateException. Enqueue-then-start ensures all items are registered before any execution begins.

} else {
handleFailure(completionStatus);
synchronized (this) {
if (isOperationCompleted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as this is done, we need to prevent more children from checkpointing their results so that we can keep the result consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChildContextOperation.checkpointSuccess checks if (parentOperation != null && parentOperation.isOperationCompleted()) before sending any checkpoint. So once handleComplete marks the parent as completed, any in-flight children that finish afterwards will skip their checkpoint and just call markAlreadyCompleted() locally.

Copy link
Contributor

@wangyb-A wangyb-A left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have some minor refactors and fix in the future.

@zhongkechen Please also take a look 😀

@zhongkechen
Copy link
Contributor

We will have some minor refactors and fix in the future.

@zhongkechen Please also take a look 😀

We still need some major fixes 😅

@ayushiahjolia ayushiahjolia merged commit b625cb8 into main Mar 20, 2026
11 checks passed
@ayushiahjolia ayushiahjolia deleted the map_bug_fixes branch March 20, 2026 18:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants