Merged
Conversation
98e2bca to
3928672
Compare
Implement double-check pattern across all operation types to handle synchronous checkpoint responses, preventing invalid state transitions and unnecessary suspensions. Bug fix: Callback operations now defer errors to Callback.result() instead of raising immediately in create_callback(), ensuring deterministic replay when code executes between callback creation and result retrieval. Changes: - Add OperationExecutor base class with CheckResult for status checking - Implement double-check pattern: check status before and after checkpoint - Use is_sync parameter to control checkpoint synchronization behavior - Refactor all operations to use executor pattern: * StepOperationExecutor: sync for AT_MOST_ONCE, async for AT_LEAST_ONCE * InvokeOperationExecutor: sync checkpoint, always suspends * WaitOperationExecutor: sync checkpoint, suspends if not complete * CallbackOperationExecutor: sync checkpoint, defers errors to result() * WaitForConditionOperationExecutor: async checkpoint, no second check * ChildOperationExecutor: async checkpoint, handles large payloads - Remove inline while loops, centralize logic in base class - Update all tests to expect double checkpoint checks with side_effect mocks Affected modules: - operation/base.py: New OperationExecutor and CheckResult classes - operation/step.py: StepOperationExecutor implementation - operation/invoke.py: InvokeOperationExecutor implementation - operation/wait.py: WaitOperationExecutor implementation - operation/callback.py: CallbackOperationExecutor with deferred errors - operation/wait_for_condition.py: WaitForConditionOperationExecutor - operation/child.py: ChildOperationExecutor with ReplayChildren support - All operation tests: Updated mocks for double-check pattern
3928672 to
8dd58c8
Compare
wangyb-A
approved these changes
Dec 8, 2025
Contributor
wangyb-A
left a comment
There was a problem hiding this comment.
approved.
nit: The expression of using state.create_checkpoint in sync mode is different among different operations. Should we use create_checkpoint_sync to keep it consistent and easier to read?
e.g. we are using
self.state.create_checkpoint(operation_update=create_callback_operation) for callback. And using self.state.create_checkpoint(operation_update=start_operation, is_sync=True) for invoke.
They have same semantic though
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description of changes:
Implement double-check pattern across all operation types to handle synchronous checkpoint responses, preventing invalid state transitions and unnecessary suspensions.
Bug fix: Callback operations now defer errors to Callback.result() instead of raising immediately in create_callback(), ensuring deterministic replay when code executes between callback creation and result retrieval.
Changes:
Affected modules:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.