From 75a589d83340b7c5a904343e03681523acfd8557 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 20 Jan 2021 16:31:46 -0800 Subject: [PATCH] Fix child workflow cancel invalid fsm transition (#344) We just were missing an acceptable transition --- internal/internal_decision_state_machine.go | 7 +++- test/integration_test.go | 41 +++++++++++++++++++++ test/workflow_test.go | 35 ++++++++++++++++++ 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/internal/internal_decision_state_machine.go b/internal/internal_decision_state_machine.go index 830800568..2d35e5252 100644 --- a/internal/internal_decision_state_machine.go +++ b/internal/internal_decision_state_machine.go @@ -221,8 +221,10 @@ func (d commandState) String() string { return "Completed" case commandStateCanceledBeforeSent: return "CanceledBeforeSent" + case commandStateCancellationCommandAccepted: + return "CancellationCommandAccepted" default: - return "Unknown" + return fmt.Sprintf("Unknown: %d", int32(d)) } } @@ -667,7 +669,8 @@ func (d *childWorkflowCommandStateMachine) handleCanceledEvent() { func (d *childWorkflowCommandStateMachine) handleCompletionEvent() { switch d.state { - case commandStateStarted, commandStateCanceledAfterStarted, commandStateCompletedAfterCancellationCommandSent: + case commandStateStarted, commandStateCanceledAfterStarted, + commandStateCompletedAfterCancellationCommandSent, commandStateCancellationCommandAccepted: d.moveState(commandStateCompleted, eventCompletion) default: d.commandStateMachineBase.handleCompletionEvent() diff --git a/test/integration_test.go b/test/integration_test.go index 5bb3d7db2..1e14211f3 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -600,6 +600,47 @@ func (ts *IntegrationTestSuite) TestCancelChildWorkflow() { ts.EqualValues(expected, ts.activities.invoked()) } +func (ts *IntegrationTestSuite) TestCancelChildWorkflowUnusualTransitions() { + wfid := "test-cancel-child-workflow-unusual-transitions" + run, err := ts.client.ExecuteWorkflow(context.Background(), + ts.startWorkflowOptions(wfid), + ts.workflows.ChildWorkflowCancelUnusualTransitionsRepro) + ts.NoError(err) + + // Give it a sec to populate the query + <-time.After(1 * time.Second) + + v, err := ts.client.QueryWorkflow(context.Background(), run.GetID(), "", "child-workflow-id") + ts.NoError(err) + + var childWorkflowID string + err = v.Get(&childWorkflowID) + ts.NoError(err) + ts.NotNil(childWorkflowID) + ts.NotEmpty(childWorkflowID) + + err = ts.client.CancelWorkflow(context.Background(), childWorkflowID, "") + ts.NoError(err) + + err = ts.client.CancelWorkflow(context.Background(), run.GetID(), "") + ts.NoError(err) + + err = ts.client.SignalWorkflow( + context.Background(), + childWorkflowID, + "", + "unblock", + nil, + ) + ts.NoError(err) + + // Synchronously wait for the workflow completion. Behind the scenes the SDK performs a long poll operation. + // If you need to wait for the workflow completion from another process use + // Client.GetWorkflow API to get an instance of a WorkflowRun. + err = run.Get(context.Background(), nil) + ts.NoError(err) +} + func (ts *IntegrationTestSuite) TestCancelActivityImmediately() { ts.T().Skip(`Currently fails with "PanicError": "unknown command internal.commandID{commandType:0, id:"5"}, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition`) var expected []string diff --git a/test/workflow_test.go b/test/workflow_test.go index fa547ead4..88713e3d1 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -397,6 +397,39 @@ func (w *Workflows) ChildWorkflowSuccessWithParentClosePolicyAbandon(ctx workflo err = ft.GetChildWorkflowExecution().Get(ctx, &childWE) return childWE.ID, err } +func (w *Workflows) childWorkflowWaitOnSignal(ctx workflow.Context) error { + workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil) + return nil +} + +func (w *Workflows) ChildWorkflowCancelUnusualTransitionsRepro(ctx workflow.Context) error { + var childWorkflowID string + err := workflow.SetQueryHandler(ctx, "child-workflow-id", func(input []byte) (string, error) { + return childWorkflowID, nil + }) + if err != nil { + return err + } + + cwo := workflow.ChildWorkflowOptions{WorkflowRunTimeout: time.Second * 2} + ctx = workflow.WithChildOptions(ctx, cwo) + + childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, w.childWorkflowWaitOnSignal) + + var childWorkflowExecution workflow.Execution + err = childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childWorkflowExecution) + if err != nil { + return err + } + childWorkflowID = childWorkflowExecution.ID + + var result string + err = childWorkflowFuture.Get(ctx, &result) + if err != nil { + return err + } + return nil +} func (w *Workflows) ActivityCancelRepro(ctx workflow.Context) ([]string, error) { ctx, cancelFunc := workflow.WithCancel(ctx) @@ -1087,6 +1120,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ChildWorkflowSuccess) worker.RegisterWorkflow(w.ChildWorkflowSuccessWithParentClosePolicyTerminate) worker.RegisterWorkflow(w.ChildWorkflowSuccessWithParentClosePolicyAbandon) + worker.RegisterWorkflow(w.ChildWorkflowCancelUnusualTransitionsRepro) worker.RegisterWorkflow(w.ConsistentQueryWorkflow) worker.RegisterWorkflow(w.ContextPropagator) worker.RegisterWorkflow(w.ContinueAsNew) @@ -1111,6 +1145,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childForMemoAndSearchAttr) + worker.RegisterWorkflow(w.childWorkflowWaitOnSignal) worker.RegisterWorkflow(w.sleep) worker.RegisterWorkflow(w.timer) }