Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ErrStateMachineNotFound handling in HSM state replication #7032

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions service/history/ndc/hsm_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,19 @@ func (r *HSMStateReplicatorImpl) syncHSMNode(
incomingNodePath := incomingNode.Path()
currentNode, err := currentHSM.Child(incomingNodePath)
if err != nil {
// 1. Already done history resend if needed before,
// and node creation today always associated with an event
// 2. Node deletion is not supported right now.
// Based on 1 and 2, node should always be found here.
// The node may not be found if:
// 1. The state machine was deleted (e.g. terminal state cleanup)
// 2. We're missing events that created this node
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not true based on the comment that you deleted.

Already done history resend if needed before,
// and node creation today always associated with an event

I would also clarify that creation and deletion are always associated with an event.

if errors.Is(err, hsm.ErrStateMachineNotFound) {
// In terminal state, nodes can be deleted
// Ignore the error and continue processing other nodes
r.logger.Debug("State machine not found - likely deleted in terminal state",
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().NamespaceId),
tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowId),
tag.WorkflowRunID(mutableState.GetExecutionInfo().OriginalExecutionRunId),
)
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd add a sanity check that version history in the mutable state is > the one in the request (same as the one on L265. or just return that info from compareVersionHistory), and return an error otherwise.

Copy link
Contributor Author

@justinp-tt justinp-tt Dec 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already check this in compareVersionHistory? In other words, an error will be returned by compareVersionHistory if the condition you mention exists, so we won't even get to the point of a ErrStateMachineNotFound error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm not sure I follow. Error is returned from compareVersionHistory if last version history item of the (local) mutable state is < that in the request. The check I mentioned is for > (also not the same as the >= checked in compareVersionHistory)

}
return err
}

Expand Down
41 changes: 41 additions & 0 deletions service/history/ndc/hsm_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,47 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingStateNewer_WorkflowClosed(
s.NoError(err)
}

func (s *hsmStateReplicatorSuite) TestSyncHSM_StateMachineNotFound() {
persistedState := s.buildWorkflowMutableState()
// Remove the child1 state machine so it doesn't exist
delete(persistedState.ExecutionInfo.SubStateMachinesByType[s.stateMachineDef.Type()].MachinesById, "child1")

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), &persistence.GetWorkflowExecutionRequest{
ShardID: s.mockShard.GetShardID(),
NamespaceID: s.workflowKey.NamespaceID,
WorkflowID: s.workflowKey.WorkflowID,
RunID: s.workflowKey.RunID,
}).Return(&persistence.GetWorkflowExecutionResponse{
State: persistedState,
DBRecordVersion: 777,
}, nil).Times(1)

err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{
WorkflowKey: s.workflowKey,
EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0],
StateMachineNode: &persistencespb.StateMachineNode{
Children: map[string]*persistencespb.StateMachineMap{
s.stateMachineDef.Type(): {
MachinesById: map[string]*persistencespb.StateMachineNode{
"child1": {
Data: []byte(hsmtest.State3),
InitialVersionedTransition: &persistencespb.VersionedTransition{
NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(),
},
LastUpdateVersionedTransition: &persistencespb.VersionedTransition{
NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion() + 100,
},
TransitionCount: 50,
},
},
},
},
},
})

s.NoError(err) // Expect no error as we should gracefully handle missing state machines
}

func (s *hsmStateReplicatorSuite) buildWorkflowMutableState() *persistencespb.WorkflowMutableState {

info := &persistencespb.WorkflowExecutionInfo{
Expand Down
Loading