diff --git a/internal/internal_command_state_machine.go b/internal/internal_command_state_machine.go index d758d8ce3..e1b7cc2a9 100644 --- a/internal/internal_command_state_machine.go +++ b/internal/internal_command_state_machine.go @@ -85,10 +85,6 @@ type ( cancelActivityStateMachine struct { *commandStateMachineBase attributes *commandpb.RequestCancelActivityTaskCommandAttributes - - // The commandsHelper.nextCommandEventIDResetCounter when this command - // incremented commandsHelper.commandsCancelledDuringWFCancellation. - cancelledOnEventIDResetCounter uint64 } timerCommandStateMachine struct { @@ -99,10 +95,6 @@ type ( cancelTimerCommandStateMachine struct { *commandStateMachineBase attributes *commandpb.CancelTimerCommandAttributes - - // The commandsHelper.nextCommandEventIDResetCounter when this command - // incremented commandsHelper.commandsCancelledDuringWFCancellation. - cancelledOnEventIDResetCounter uint64 } childWorkflowCommandStateMachine struct { @@ -150,18 +142,10 @@ type ( orderedCommands *list.List commands map[commandID]*list.Element - scheduledEventIDToActivityID map[int64]string - scheduledEventIDToCancellationID map[int64]string - scheduledEventIDToSignalID map[int64]string - versionMarkerLookup map[int64]versionMarker - commandsCancelledDuringWFCancellation int64 - workflowExecutionIsCancelling bool - - // Incremented everytime nextCommandEventID and - // commandsCancelledDuringWFCancellation is reset (i.e. on new workflow - // task). Won't ever happen, but technically the way this value is compared - // is safe for overflow wrap around. - nextCommandEventIDResetCounter uint64 + scheduledEventIDToActivityID map[int64]string + scheduledEventIDToCancellationID map[int64]string + scheduledEventIDToSignalID map[int64]string + versionMarkerLookup map[int64]versionMarker } // panic when command state machine is in illegal state @@ -477,9 +461,6 @@ func (d *commandStateMachineBase) cancel() { case commandStateCommandSent: d.moveState(commandStateCancellationCommandSent, eventCancel) case commandStateInitiated: - if d.helper.workflowExecutionIsCancelling { - d.helper.commandsCancelledDuringWFCancellation++ - } d.moveState(commandStateCanceledAfterInitiated, eventCancel) default: d.failStateTransition(eventCancel) @@ -589,10 +570,6 @@ func (d *activityCommandStateMachine) cancel() { } cancelCmd := d.helper.newCancelActivityStateMachine(attribs) d.helper.addCommand(cancelCmd) - // We must mark the event ID reset counter for when we performed this - // increment so a potential decrement can only decrement if it wasn't - // reset - cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter // We also mark the schedule command as not eager if we haven't sent it yet. // Server behavior differs on eager vs non-eager when scheduling and // cancelling during the same task completion. If it has not been sent this @@ -614,10 +591,6 @@ func (d *timerCommandStateMachine) cancel() { } cancelCmd := d.helper.newCancelTimerCommandStateMachine(attribs) d.helper.addCommand(cancelCmd) - // We must mark the event ID reset counter for when we performed this - // increment so a potential decrement can only decrement if it wasn't - // reset - cancelCmd.cancelledOnEventIDResetCounter = d.helper.nextCommandEventIDResetCounter } d.commandStateMachineBase.cancel() @@ -729,9 +702,6 @@ func (d *childWorkflowCommandStateMachine) handleCancelFailedEvent() { func (d *childWorkflowCommandStateMachine) cancel() { switch d.state { case commandStateStarted: - if d.helper.workflowExecutionIsCancelling { - d.helper.commandsCancelledDuringWFCancellation++ - } d.moveState(commandStateCanceledAfterStarted, eventCancel) // A child workflow may be canceled _after_ something like an activity start // happens inside a simulated goroutine. However, since the state of the @@ -888,11 +858,10 @@ func newCommandsHelper() *commandsHelper { orderedCommands: list.New(), commands: make(map[commandID]*list.Element), - scheduledEventIDToActivityID: make(map[int64]string), - scheduledEventIDToCancellationID: make(map[int64]string), - scheduledEventIDToSignalID: make(map[int64]string), - versionMarkerLookup: make(map[int64]versionMarker), - commandsCancelledDuringWFCancellation: 0, + scheduledEventIDToActivityID: make(map[int64]string), + scheduledEventIDToCancellationID: make(map[int64]string), + scheduledEventIDToSignalID: make(map[int64]string), + versionMarkerLookup: make(map[int64]versionMarker), } } @@ -905,13 +874,20 @@ func (h *commandsHelper) setCurrentWorkflowTaskStartedEventID(workflowTaskStarte // corresponding history event after processing. So we can use workflow task started event id + 2 as the offset as // workflow task completed event is always the first event in the workflow task followed by events generated from // commands. This allows client sdk to deterministically predict history event ids generated by processing of the - // command. We must also add the number of cancel commands that were spawned during cancellation of the workflow - // execution as those canceled command events will show up *after* the workflow task completed event. - h.nextCommandEventID = workflowTaskStartedEventID + 2 + h.commandsCancelledDuringWFCancellation - h.commandsCancelledDuringWFCancellation = 0 - // We must change the counter here so that others who mutate - // commandsCancelledDuringWFCancellation know it has since been reset - h.nextCommandEventIDResetCounter++ + // command. It is possible, notably during workflow cancellation, that commands are generated before the workflow + // task started event is processed. In this case we need to adjust the nextCommandEventID to account for these unsent + // commands.git + var uncountedCommands int64 + for curr := h.orderedCommands.Front(); curr != nil; { + d := curr.Value.(commandStateMachine) + command := d.getCommand() + if command != nil { + uncountedCommands += 1 + } + curr = curr.Next() + } + + h.nextCommandEventID = workflowTaskStartedEventID + 2 + uncountedCommands } func (h *commandsHelper) getNextID() int64 { @@ -974,24 +950,7 @@ func (h *commandsHelper) removeCancelOfResolvedCommand(commandID commandID) { orderedCmdEl, ok := h.commands[commandID] if ok { delete(h.commands, commandID) - command := h.orderedCommands.Remove(orderedCmdEl) - // Sometimes commandsCancelledDuringWFCancellation was incremented before - // it was reset and sometimes not. We make sure the workflow execution is - // actually cancelling since that's the only time we increment the counter - // in the first place. Also, we use the reset counter to see if we're still - // on the same iteration where we may have incremented it before. - if h.workflowExecutionIsCancelling { - switch command := command.(type) { - case *cancelActivityStateMachine: - if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter { - h.commandsCancelledDuringWFCancellation-- - } - case *cancelTimerCommandStateMachine: - if command.cancelledOnEventIDResetCounter == h.nextCommandEventIDResetCounter { - h.commandsCancelledDuringWFCancellation-- - } - } - } + _ = h.orderedCommands.Remove(orderedCmdEl) } } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index fc9c92fc4..a9613c1b3 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -494,7 +494,6 @@ func validateAndSerializeMemo(memoMap map[string]interface{}, dc converter.DataC func (wc *workflowEnvironmentImpl) RegisterCancelHandler(handler func()) { wrappedHandler := func() { - wc.commandsHelper.workflowExecutionIsCancelling = true handler() } wc.cancelHandler = wrappedHandler diff --git a/test/activity_test.go b/test/activity_test.go index 5f902c736..2d822ebbf 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -76,6 +76,23 @@ func LocalSleep(_ context.Context, delay time.Duration) error { return nil } +func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) { + a.append("ActivityToBeCanceled") + for { + select { + case <-time.After(1 * time.Second): + activity.RecordHeartbeat(ctx, "") + case <-ctx.Done(): + return "I am canceled by Done", nil + } + } +} + +func (a *Activities) EmptyActivity(ctx context.Context) error { + a.append("EmptyActivity") + return nil +} + func (a *Activities) HeartbeatAndSleep(ctx context.Context, seq int, delay time.Duration) (int, error) { a.append("heartbeatAndSleep") activity.GetLogger(ctx).Info("Running HeartbeatAndSleep activity") diff --git a/test/integration_test.go b/test/integration_test.go index b06d36c7a..29c3af179 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1205,6 +1205,18 @@ func (ts *IntegrationTestSuite) TestMutatingUpdateValidator() { ts.Nil(ts.client.CancelWorkflow(ctx, "test-mutating-update-validator", "")) } +func (ts *IntegrationTestSuite) TestWaitForCancelWithDisconnectedContext() { + ctx := context.Background() + run, err := ts.client.ExecuteWorkflow(ctx, + ts.startWorkflowOptions("test-wait-for-cancel-with-disconnected-contex"), ts.workflows.WaitForCancelWithDisconnectedContextWorkflow) + ts.Nil(err) + + ts.waitForQueryTrue(run, "timer-created", 1) + + ts.Nil(ts.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID())) + ts.Nil(run.Get(ctx, nil)) +} + func (ts *IntegrationTestSuite) TestMutatingSideEffect() { ctx := context.Background() err := ts.executeWorkflowWithContextAndOption(ctx, ts.startWorkflowOptions("test-mutating-side-effect"), ts.workflows.MutatingSideEffectWorkflow, nil) diff --git a/test/replaytests/replay-tests-cancel-order-timer-resolved.json b/test/replaytests/replay-tests-cancel-order-timer-resolved.json new file mode 100644 index 000000000..1f407e370 --- /dev/null +++ b/test/replaytests/replay-tests-cancel-order-timer-resolved.json @@ -0,0 +1,281 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2023-07-27T13:17:07.829982170Z", + "eventType": "WorkflowExecutionStarted", + "taskId": "1160915", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "CancelOrderSelectWorkflow" + }, + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "0s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "981183a2-7b46-44de-8f70-189b8c7aa8c1", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "firstExecutionRunId": "981183a2-7b46-44de-8f70-189b8c7aa8c1", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": { + + } + } + }, + { + "eventId": "2", + "eventTime": "2023-07-27T13:17:07.830006003Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160916", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2023-07-27T13:17:07.844309045Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160923", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "1bcfea99-c161-40b5-8a3c-2d698594994a", + "historySizeBytes": "572" + } + }, + { + "eventId": "4", + "eventTime": "2023-07-27T13:17:07.855022837Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160927", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + "langUsedFlags": [ + 3 + ] + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "5", + "eventTime": "2023-07-27T13:17:07.855038212Z", + "eventType": "TimerStarted", + "taskId": "1160928", + "timerStartedEventAttributes": { + "timerId": "5", + "startToFireTimeout": "300s", + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "6", + "eventTime": "2023-07-27T13:17:09.854134879Z", + "eventType": "WorkflowExecutionCancelRequested", + "taskId": "1160932", + "workflowExecutionCancelRequestedEventAttributes": { + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "7", + "eventTime": "2023-03-09T07:03:19.936924001Z", + "eventType": "TimerFired", + "taskId": "1054628", + "timerFiredEventAttributes": { + "timerId": "5", + "startedEventId": "5" + } + }, + { + "eventId": "8", + "eventTime": "2023-07-27T13:17:09.854143713Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160933", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Quinn-Klassens-MacBook-Pro.local:c3129203-345c-45c1-ae50-ae9efd5f4c66", + "kind": "Sticky" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "9", + "eventTime": "2023-07-27T13:17:09.867791379Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160937", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "8", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "9d569699-7b95-4074-bf5f-0574e85f84a3", + "historySizeBytes": "1021" + } + }, + { + "eventId": "10", + "eventTime": "2023-07-27T13:17:09.879198421Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160942", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "8", + "startedEventId": "9", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "11", + "eventTime": "2023-07-27T13:17:09.879409629Z", + "eventType": "ActivityTaskScheduled", + "taskId": "1160944", + "activityTaskScheduledEventAttributes": { + "activityId": "11", + "activityType": { + "name": "helloworldActivity" + }, + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "header": { + + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmxkIg==" + } + ] + }, + "scheduleToCloseTimeout": "0s", + "scheduleToStartTimeout": "60s", + "startToCloseTimeout": "60s", + "heartbeatTimeout": "20s", + "workflowTaskCompletedEventId": "9", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + } + } + }, + { + "eventId": "12", + "eventTime": "2023-07-27T13:17:09.879424171Z", + "eventType": "ActivityTaskStarted", + "taskId": "1160948", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "11", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "358dbaf6-0b82-47bc-86e8-d46ad928db73", + "attempt": 1 + } + }, + { + "eventId": "13", + "eventTime": "2023-07-27T13:17:09.886071254Z", + "eventType": "ActivityTaskCompleted", + "taskId": "1160949", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvIHdvcmxkISI=" + } + ] + }, + "scheduledEventId": "11", + "startedEventId": "12", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "14", + "eventTime": "2023-07-27T13:17:09.886075379Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160950", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Quinn-Klassens-MacBook-Pro.local:c3129203-345c-45c1-ae50-ae9efd5f4c66", + "kind": "Sticky" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "15", + "eventTime": "2023-07-27T13:17:09.894641588Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160954", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "14", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "f3805eb0-f98e-47b8-b1b1-128868619b4f", + "historySizeBytes": "1809" + } + }, + { + "eventId": "16", + "eventTime": "2023-07-27T13:17:09.905497754Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160958", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "14", + "startedEventId": "15", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "17", + "eventTime": "2023-07-27T13:17:09.905521629Z", + "eventType": "WorkflowExecutionCompleted", + "taskId": "1160959", + "workflowExecutionCompletedEventAttributes": { + "workflowTaskCompletedEventId": "16" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/replay-tests-cancel-order.json b/test/replaytests/replay-tests-cancel-order.json new file mode 100644 index 000000000..1b5f174f3 --- /dev/null +++ b/test/replaytests/replay-tests-cancel-order.json @@ -0,0 +1,283 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2023-07-27T13:17:07.829982170Z", + "eventType": "WorkflowExecutionStarted", + "taskId": "1160915", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "CancelOrderSelectWorkflow" + }, + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "0s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "981183a2-7b46-44de-8f70-189b8c7aa8c1", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "firstExecutionRunId": "981183a2-7b46-44de-8f70-189b8c7aa8c1", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": { + + } + } + }, + { + "eventId": "2", + "eventTime": "2023-07-27T13:17:07.830006003Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160916", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2023-07-27T13:17:07.844309045Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160923", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "1bcfea99-c161-40b5-8a3c-2d698594994a", + "historySizeBytes": "572" + } + }, + { + "eventId": "4", + "eventTime": "2023-07-27T13:17:07.855022837Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160927", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + "langUsedFlags": [ + 3 + ] + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "5", + "eventTime": "2023-07-27T13:17:07.855038212Z", + "eventType": "TimerStarted", + "taskId": "1160928", + "timerStartedEventAttributes": { + "timerId": "5", + "startToFireTimeout": "300s", + "workflowTaskCompletedEventId": "4" + } + }, + { + "eventId": "6", + "eventTime": "2023-07-27T13:17:09.854134879Z", + "eventType": "WorkflowExecutionCancelRequested", + "taskId": "1160932", + "workflowExecutionCancelRequestedEventAttributes": { + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "7", + "eventTime": "2023-07-27T13:17:09.854143713Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160933", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Quinn-Klassens-MacBook-Pro.local:c3129203-345c-45c1-ae50-ae9efd5f4c66", + "kind": "Sticky" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "8", + "eventTime": "2023-07-27T13:17:09.867791379Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160937", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "7", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "9d569699-7b95-4074-bf5f-0574e85f84a3", + "historySizeBytes": "1021" + } + }, + { + "eventId": "9", + "eventTime": "2023-07-27T13:17:09.879198421Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160942", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "7", + "startedEventId": "8", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "10", + "eventTime": "2023-07-27T13:17:09.879390796Z", + "eventType": "TimerCanceled", + "taskId": "1160943", + "timerCanceledEventAttributes": { + "timerId": "5", + "startedEventId": "5", + "workflowTaskCompletedEventId": "9", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "11", + "eventTime": "2023-07-27T13:17:09.879409629Z", + "eventType": "ActivityTaskScheduled", + "taskId": "1160944", + "activityTaskScheduledEventAttributes": { + "activityId": "11", + "activityType": { + "name": "helloworldActivity" + }, + "taskQueue": { + "name": "replay-test", + "kind": "Normal" + }, + "header": { + + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IndvcmxkIg==" + } + ] + }, + "scheduleToCloseTimeout": "0s", + "scheduleToStartTimeout": "60s", + "startToCloseTimeout": "60s", + "heartbeatTimeout": "20s", + "workflowTaskCompletedEventId": "9", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + } + } + }, + { + "eventId": "12", + "eventTime": "2023-07-27T13:17:09.879424171Z", + "eventType": "ActivityTaskStarted", + "taskId": "1160948", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "11", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "358dbaf6-0b82-47bc-86e8-d46ad928db73", + "attempt": 1 + } + }, + { + "eventId": "13", + "eventTime": "2023-07-27T13:17:09.886071254Z", + "eventType": "ActivityTaskCompleted", + "taskId": "1160949", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvIHdvcmxkISI=" + } + ] + }, + "scheduledEventId": "11", + "startedEventId": "12", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@" + } + }, + { + "eventId": "14", + "eventTime": "2023-07-27T13:17:09.886075379Z", + "eventType": "WorkflowTaskScheduled", + "taskId": "1160950", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "Quinn-Klassens-MacBook-Pro.local:c3129203-345c-45c1-ae50-ae9efd5f4c66", + "kind": "Sticky" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "15", + "eventTime": "2023-07-27T13:17:09.894641588Z", + "eventType": "WorkflowTaskStarted", + "taskId": "1160954", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "14", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "requestId": "f3805eb0-f98e-47b8-b1b1-128868619b4f", + "historySizeBytes": "1809" + } + }, + { + "eventId": "16", + "eventTime": "2023-07-27T13:17:09.905497754Z", + "eventType": "WorkflowTaskCompleted", + "taskId": "1160958", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "14", + "startedEventId": "15", + "identity": "99286@Quinn-Klassens-MacBook-Pro.local@", + "workerVersioningId": { + "workerBuildId": "a6293ecc93d55fa71eb0d9687e852156" + }, + "sdkMetadata": { + + }, + "meteringMetadata": { + + } + } + }, + { + "eventId": "17", + "eventTime": "2023-07-27T13:17:09.905521629Z", + "eventType": "WorkflowExecutionCompleted", + "taskId": "1160959", + "workflowExecutionCompletedEventAttributes": { + "workflowTaskCompletedEventId": "16" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index d090dd41e..fb61dec3f 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -367,6 +367,17 @@ func (s *replayTestSuite) TestVersionAndMutableSideEffect() { s.NoError(err) } +func (s *replayTestSuite) TestCancelOrder() { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(CancelOrderSelectWorkflow) + + err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "replay-tests-cancel-order.json") + s.NoError(err) + + err = replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "replay-tests-cancel-order-timer-resolved.json") + s.NoError(err) +} + type captureConverter struct { converter.DataConverter toPayloads []interface{} diff --git a/test/replaytests/workflows.go b/test/replaytests/workflows.go index 5516c9aec..89834c30a 100644 --- a/test/replaytests/workflows.go +++ b/test/replaytests/workflows.go @@ -437,3 +437,36 @@ func generateUUID(ctx workflow.Context) (string, error) { return generatedUUID, nil } + +func CancelOrderSelectWorkflow(ctx workflow.Context) error { + timerf := workflow.NewTimer(ctx, 5*time.Minute) + + var err error + disCtx, _ := workflow.NewDisconnectedContext(ctx) + selector := workflow.NewSelector(ctx) + + selector.AddFuture(timerf, func(f workflow.Future) { + err = timerf.Get(ctx, nil) + // do something different on cancel error + if !temporal.IsCanceledError(err) { + _ = workflow.UpsertSearchAttributes(ctx, map[string]interface{}{"CustomKeywordField": "testkey"}) + } else { + var result string + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + disCtx = workflow.WithActivityOptions(disCtx, ao) + err = workflow.ExecuteActivity(disCtx, helloworldActivity, "world").Get(ctx, &result) + } + + }) + selector.AddReceive(ctx.Done(), func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, nil) + err = workflow.Sleep(disCtx, 1*time.Second) + + }) + selector.Select(ctx) + return err +} diff --git a/test/workflow_test.go b/test/workflow_test.go index 8319b17f0..48d093e78 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1575,6 +1575,58 @@ func (w *Workflows) CronWorkflow(ctx workflow.Context) (int, error) { return retme, nil } +func (w *Workflows) WaitForCancelWithDisconnectedContextWorkflow(ctx workflow.Context) (err error) { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + HeartbeatTimeout: 5 * time.Second, + WaitForCancellation: true, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + var activities *Activities + defer func() { + if !errors.Is(ctx.Err(), workflow.ErrCanceled) { + return + } + // When the Workflow is canceled, it has to get a new disconnected context to execute any Activities + newCtx, _ := workflow.NewDisconnectedContext(ctx) + err = workflow.ExecuteActivity(newCtx, activities.EmptyActivity).Get(newCtx, nil) + }() + + s := workflow.NewSelector(ctx) + + newCtx, _ := workflow.NewDisconnectedContext(ctx) + newCtx, cancel := workflow.WithCancel(newCtx) + + timer1 := workflow.NewTimer(newCtx, 5*time.Minute) + + err = workflow.SetQueryHandler(newCtx, "timer-created", func() (bool, error) { + return true, nil + }) + if err != nil { + return err + } + + s.AddFuture(timer1, func(f workflow.Future) { + err = f.Get(newCtx, nil) + if !errors.Is(ctx.Err(), workflow.ErrCanceled) { + panic("error is not canceled error") + } + }) + + s.AddReceive(ctx.Done(), func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, nil) + cancel() + s.Select(ctx) + }) + + s.Select(ctx) + + var result string + err = workflow.ExecuteActivity(ctx, activities.EmptyActivity).Get(ctx, &result) + return +} + func (w *Workflows) CancelTimerConcurrentWithOtherCommandWorkflow(ctx workflow.Context) (int, error) { ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, @@ -2251,6 +2303,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.CancelTimerAfterActivity) worker.RegisterWorkflow(w.CancelTimerViaDeferAfterWFTFailure) worker.RegisterWorkflow(w.CascadingCancellation) + worker.RegisterWorkflow(w.WaitForCancelWithDisconnectedContextWorkflow) worker.RegisterWorkflow(w.ChildWorkflowRetryOnError) worker.RegisterWorkflow(w.ChildWorkflowRetryOnTimeout) worker.RegisterWorkflow(w.ChildWorkflowSuccess)