From 2c19379b13b04ab2cf222420b86631b2809f84ca Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 18 Feb 2022 08:10:17 -0600 Subject: [PATCH] Remove pending activity cancellations when activity completion occurs (#726) --- internal/internal_decision_state_machine.go | 49 +++++++++++++++++++-- internal/version.go | 2 +- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/internal/internal_decision_state_machine.go b/internal/internal_decision_state_machine.go index 96425301d..fcf1a4c8d 100644 --- a/internal/internal_decision_state_machine.go +++ b/internal/internal_decision_state_machine.go @@ -86,6 +86,10 @@ type ( cancelActivityStateMachine struct { *commandStateMachineBase attributes *commandpb.RequestCancelActivityTaskCommandAttributes + + // The commandsHelper.nextCommandEventIDResetCounter when this command + // incremented commandsHelper.commandsCancelledDuringWFCancellation. + cancelledOnEventIDResetCounter uint64 } timerCommandStateMachine struct { @@ -96,6 +100,10 @@ type ( cancelTimerCommandStateMachine struct { *commandStateMachineBase attributes *commandpb.CancelTimerCommandAttributes + + // The commandsHelper.nextCommandEventIDResetCounter when this command + // incremented commandsHelper.commandsCancelledDuringWFCancellation. + cancelledOnEventIDResetCounter uint64 } childWorkflowCommandStateMachine struct { @@ -137,6 +145,12 @@ type ( versionMarkerLookup map[int64]string commandsCancelledDuringWFCancellation int64 workflowExecutionIsCancelling bool + + // Incremented everytime nextCommandEventID and + // commandsCancelledDuringWFCancellation is reset (i.e. on new workflow + // task). Won't ever happen, but technically the way this value is compared + // is safe for overflow wrap around. + nextCommandEventIDResetCounter uint64 } // panic when command state machine is in illegal state @@ -528,6 +542,10 @@ func (d *activityCommandStateMachine) cancel() { } cancelCmd := d.helper.newCancelActivityStateMachine(attribs) d.helper.addCommand(cancelCmd) + // We must mark the event ID reset counter for when we performed this + // increment so a potential decrement can only decrement if it wasn't + // reset + cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter } d.commandStateMachineBase.cancel() @@ -541,6 +559,10 @@ func (d *timerCommandStateMachine) cancel() { } cancelCmd := d.helper.newCancelTimerCommandStateMachine(attribs) d.helper.addCommand(cancelCmd) + // We must mark the event ID reset counter for when we performed this + // increment so a potential decrement can only decrement if it wasn't + // reset + cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter } d.commandStateMachineBase.cancel() @@ -824,6 +846,9 @@ func (h *commandsHelper) setCurrentWorkflowTaskStartedEventID(workflowTaskStarte // execution as those canceled command events will show up *after* the workflow task completed event. h.nextCommandEventID = workflowTaskStartedEventID + 2 + h.commandsCancelledDuringWFCancellation h.commandsCancelledDuringWFCancellation = 0 + // We must change the counter here so that others who mutate + // commandsCancelledDuringWFCancellation know it has since been reset + h.nextCommandEventIDResetCounter++ } func (h *commandsHelper) getNextID() int64 { @@ -877,14 +902,26 @@ func (h *commandsHelper) addCommand(command commandStateMachine) { // might be in the same workflow task. In practice this only seems to happen during unhandled command events. func (h *commandsHelper) removeCancelOfResolvedCommand(commandID commandID) { // Ensure this isn't misused for non-cancel commands - if commandID.commandType != commandTypeCancelTimer { - panic("removeCancelOfResolvedCommand should only be called for cancel timer") + if commandID.commandType != commandTypeCancelTimer && commandID.commandType != commandTypeRequestCancelActivityTask { + panic("removeCancelOfResolvedCommand should only be called for cancel timer / activity") } orderedCmdEl, ok := h.commands[commandID] if ok { delete(h.commands, commandID) - h.orderedCommands.Remove(orderedCmdEl) - h.commandsCancelledDuringWFCancellation-- + command := h.orderedCommands.Remove(orderedCmdEl) + // Sometimes commandsCancelledDuringWFCancellation was incremented before + // it was reset and sometimes not. We use the reset counter to see if we're + // still on the same iteration where we may have incremented it before. + switch command := command.(type) { + case *cancelActivityStateMachine: + if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter { + h.commandsCancelledDuringWFCancellation-- + } + case *cancelTimerCommandStateMachine: + if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter { + h.commandsCancelledDuringWFCancellation-- + } + } } } @@ -916,6 +953,10 @@ func (h *commandsHelper) requestCancelActivityTask(activityID string) commandSta func (h *commandsHelper) handleActivityTaskClosed(activityID string, scheduledEventID int64) commandStateMachine { command := h.getCommand(makeCommandID(commandTypeActivity, activityID)) + // If, for whatever reason, we were going to send an activity cancel request, don't do that anymore + // since we already know the activity is resolved. + possibleCancelID := makeCommandID(commandTypeRequestCancelActivityTask, activityID) + h.removeCancelOfResolvedCommand(possibleCancelID) command.handleCompletionEvent() delete(h.scheduledEventIDToActivityID, scheduledEventID) return command diff --git a/internal/version.go b/internal/version.go index 5da21414b..a224bb336 100644 --- a/internal/version.go +++ b/internal/version.go @@ -30,7 +30,7 @@ package internal const ( // SDKVersion is a semver (https://semver.org/) that represents the version of this Temporal GoSDK. // Server validates if SDKVersion fits its supported range and rejects request if it doesn't. - SDKVersion = "1.13.0" + SDKVersion = "1.13.1" // SupportedServerVersions is a semver rages (https://github.com/blang/semver#ranges) of server versions that // are supported by this Temporal SDK.