Skip to content

Commit

Permalink
Rework cancellation counting (#1181)
Browse files Browse the repository at this point in the history
Rework cancellation counting
  • Loading branch information
Quinn-With-Two-Ns committed Jul 31, 2023
1 parent 6523916 commit 30c8ca2
Show file tree
Hide file tree
Showing 9 changed files with 713 additions and 65 deletions.
87 changes: 23 additions & 64 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ type (
cancelActivityStateMachine struct {
*commandStateMachineBase
attributes *commandpb.RequestCancelActivityTaskCommandAttributes

// The commandsHelper.nextCommandEventIDResetCounter when this command
// incremented commandsHelper.commandsCancelledDuringWFCancellation.
cancelledOnEventIDResetCounter uint64
}

timerCommandStateMachine struct {
Expand All @@ -99,10 +95,6 @@ type (
cancelTimerCommandStateMachine struct {
*commandStateMachineBase
attributes *commandpb.CancelTimerCommandAttributes

// The commandsHelper.nextCommandEventIDResetCounter when this command
// incremented commandsHelper.commandsCancelledDuringWFCancellation.
cancelledOnEventIDResetCounter uint64
}

childWorkflowCommandStateMachine struct {
Expand Down Expand Up @@ -150,18 +142,10 @@ type (
orderedCommands *list.List
commands map[commandID]*list.Element

scheduledEventIDToActivityID map[int64]string
scheduledEventIDToCancellationID map[int64]string
scheduledEventIDToSignalID map[int64]string
versionMarkerLookup map[int64]versionMarker
commandsCancelledDuringWFCancellation int64
workflowExecutionIsCancelling bool

// Incremented everytime nextCommandEventID and
// commandsCancelledDuringWFCancellation is reset (i.e. on new workflow
// task). Won't ever happen, but technically the way this value is compared
// is safe for overflow wrap around.
nextCommandEventIDResetCounter uint64
scheduledEventIDToActivityID map[int64]string
scheduledEventIDToCancellationID map[int64]string
scheduledEventIDToSignalID map[int64]string
versionMarkerLookup map[int64]versionMarker
}

// panic when command state machine is in illegal state
Expand Down Expand Up @@ -477,9 +461,6 @@ func (d *commandStateMachineBase) cancel() {
case commandStateCommandSent:
d.moveState(commandStateCancellationCommandSent, eventCancel)
case commandStateInitiated:
if d.helper.workflowExecutionIsCancelling {
d.helper.commandsCancelledDuringWFCancellation++
}
d.moveState(commandStateCanceledAfterInitiated, eventCancel)
default:
d.failStateTransition(eventCancel)
Expand Down Expand Up @@ -589,10 +570,6 @@ func (d *activityCommandStateMachine) cancel() {
}
cancelCmd := d.helper.newCancelActivityStateMachine(attribs)
d.helper.addCommand(cancelCmd)
// We must mark the event ID reset counter for when we performed this
// increment so a potential decrement can only decrement if it wasn't
// reset
cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter
// We also mark the schedule command as not eager if we haven't sent it yet.
// Server behavior differs on eager vs non-eager when scheduling and
// cancelling during the same task completion. If it has not been sent this
Expand All @@ -614,10 +591,6 @@ func (d *timerCommandStateMachine) cancel() {
}
cancelCmd := d.helper.newCancelTimerCommandStateMachine(attribs)
d.helper.addCommand(cancelCmd)
// We must mark the event ID reset counter for when we performed this
// increment so a potential decrement can only decrement if it wasn't
// reset
cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter
}

d.commandStateMachineBase.cancel()
Expand Down Expand Up @@ -729,9 +702,6 @@ func (d *childWorkflowCommandStateMachine) handleCancelFailedEvent() {
func (d *childWorkflowCommandStateMachine) cancel() {
switch d.state {
case commandStateStarted:
if d.helper.workflowExecutionIsCancelling {
d.helper.commandsCancelledDuringWFCancellation++
}
d.moveState(commandStateCanceledAfterStarted, eventCancel)
// A child workflow may be canceled _after_ something like an activity start
// happens inside a simulated goroutine. However, since the state of the
Expand Down Expand Up @@ -888,11 +858,10 @@ func newCommandsHelper() *commandsHelper {
orderedCommands: list.New(),
commands: make(map[commandID]*list.Element),

scheduledEventIDToActivityID: make(map[int64]string),
scheduledEventIDToCancellationID: make(map[int64]string),
scheduledEventIDToSignalID: make(map[int64]string),
versionMarkerLookup: make(map[int64]versionMarker),
commandsCancelledDuringWFCancellation: 0,
scheduledEventIDToActivityID: make(map[int64]string),
scheduledEventIDToCancellationID: make(map[int64]string),
scheduledEventIDToSignalID: make(map[int64]string),
versionMarkerLookup: make(map[int64]versionMarker),
}
}

Expand All @@ -905,13 +874,20 @@ func (h *commandsHelper) setCurrentWorkflowTaskStartedEventID(workflowTaskStarte
// corresponding history event after processing. So we can use workflow task started event id + 2 as the offset as
// workflow task completed event is always the first event in the workflow task followed by events generated from
// commands. This allows client sdk to deterministically predict history event ids generated by processing of the
// command. We must also add the number of cancel commands that were spawned during cancellation of the workflow
// execution as those canceled command events will show up *after* the workflow task completed event.
h.nextCommandEventID = workflowTaskStartedEventID + 2 + h.commandsCancelledDuringWFCancellation
h.commandsCancelledDuringWFCancellation = 0
// We must change the counter here so that others who mutate
// commandsCancelledDuringWFCancellation know it has since been reset
h.nextCommandEventIDResetCounter++
// command. It is possible, notably during workflow cancellation, that commands are generated before the workflow
// task started event is processed. In this case we need to adjust the nextCommandEventID to account for these unsent
// commands.git
var uncountedCommands int64
for curr := h.orderedCommands.Front(); curr != nil; {
d := curr.Value.(commandStateMachine)
command := d.getCommand()
if command != nil {
uncountedCommands += 1
}
curr = curr.Next()
}

h.nextCommandEventID = workflowTaskStartedEventID + 2 + uncountedCommands
}

func (h *commandsHelper) getNextID() int64 {
Expand Down Expand Up @@ -974,24 +950,7 @@ func (h *commandsHelper) removeCancelOfResolvedCommand(commandID commandID) {
orderedCmdEl, ok := h.commands[commandID]
if ok {
delete(h.commands, commandID)
command := h.orderedCommands.Remove(orderedCmdEl)
// Sometimes commandsCancelledDuringWFCancellation was incremented before
// it was reset and sometimes not. We make sure the workflow execution is
// actually cancelling since that's the only time we increment the counter
// in the first place. Also, we use the reset counter to see if we're still
// on the same iteration where we may have incremented it before.
if h.workflowExecutionIsCancelling {
switch command := command.(type) {
case *cancelActivityStateMachine:
if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter {
h.commandsCancelledDuringWFCancellation--
}
case *cancelTimerCommandStateMachine:
if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter {
h.commandsCancelledDuringWFCancellation--
}
}
}
_ = h.orderedCommands.Remove(orderedCmdEl)
}
}

Expand Down
1 change: 0 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ func validateAndSerializeMemo(memoMap map[string]interface{}, dc converter.DataC

func (wc *workflowEnvironmentImpl) RegisterCancelHandler(handler func()) {
wrappedHandler := func() {
wc.commandsHelper.workflowExecutionIsCancelling = true
handler()
}
wc.cancelHandler = wrappedHandler
Expand Down
17 changes: 17 additions & 0 deletions test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,23 @@ func LocalSleep(_ context.Context, delay time.Duration) error {
return nil
}

func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) {
a.append("ActivityToBeCanceled")
for {
select {
case <-time.After(1 * time.Second):
activity.RecordHeartbeat(ctx, "")
case <-ctx.Done():
return "I am canceled by Done", nil
}
}
}

func (a *Activities) EmptyActivity(ctx context.Context) error {
a.append("EmptyActivity")
return nil
}

func (a *Activities) HeartbeatAndSleep(ctx context.Context, seq int, delay time.Duration) (int, error) {
a.append("heartbeatAndSleep")
activity.GetLogger(ctx).Info("Running HeartbeatAndSleep activity")
Expand Down
12 changes: 12 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,18 @@ func (ts *IntegrationTestSuite) TestMutatingUpdateValidator() {
ts.Nil(ts.client.CancelWorkflow(ctx, "test-mutating-update-validator", ""))
}

func (ts *IntegrationTestSuite) TestWaitForCancelWithDisconnectedContext() {
ctx := context.Background()
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions("test-wait-for-cancel-with-disconnected-contex"), ts.workflows.WaitForCancelWithDisconnectedContextWorkflow)
ts.Nil(err)

ts.waitForQueryTrue(run, "timer-created", 1)

ts.Nil(ts.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID()))
ts.Nil(run.Get(ctx, nil))
}

func (ts *IntegrationTestSuite) TestMutatingSideEffect() {
ctx := context.Background()
err := ts.executeWorkflowWithContextAndOption(ctx, ts.startWorkflowOptions("test-mutating-side-effect"), ts.workflows.MutatingSideEffectWorkflow, nil)
Expand Down
Loading

0 comments on commit 30c8ca2

Please sign in to comment.