Skip to content

Commit

Permalink
Fix child workflow cancel invalid fsm transition (#344)
Browse files Browse the repository at this point in the history
We just were missing an acceptable transition
  • Loading branch information
Sushisource committed Jan 21, 2021
1 parent 3e18a77 commit 75a589d
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 2 deletions.
7 changes: 5 additions & 2 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,10 @@ func (d commandState) String() string {
return "Completed"
case commandStateCanceledBeforeSent:
return "CanceledBeforeSent"
case commandStateCancellationCommandAccepted:
return "CancellationCommandAccepted"
default:
return "Unknown"
return fmt.Sprintf("Unknown: %d", int32(d))
}
}

Expand Down Expand Up @@ -667,7 +669,8 @@ func (d *childWorkflowCommandStateMachine) handleCanceledEvent() {

func (d *childWorkflowCommandStateMachine) handleCompletionEvent() {
switch d.state {
case commandStateStarted, commandStateCanceledAfterStarted, commandStateCompletedAfterCancellationCommandSent:
case commandStateStarted, commandStateCanceledAfterStarted,
commandStateCompletedAfterCancellationCommandSent, commandStateCancellationCommandAccepted:
d.moveState(commandStateCompleted, eventCompletion)
default:
d.commandStateMachineBase.handleCompletionEvent()
Expand Down
41 changes: 41 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,47 @@ func (ts *IntegrationTestSuite) TestCancelChildWorkflow() {
ts.EqualValues(expected, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) TestCancelChildWorkflowUnusualTransitions() {
wfid := "test-cancel-child-workflow-unusual-transitions"
run, err := ts.client.ExecuteWorkflow(context.Background(),
ts.startWorkflowOptions(wfid),
ts.workflows.ChildWorkflowCancelUnusualTransitionsRepro)
ts.NoError(err)

// Give it a sec to populate the query
<-time.After(1 * time.Second)

v, err := ts.client.QueryWorkflow(context.Background(), run.GetID(), "", "child-workflow-id")
ts.NoError(err)

var childWorkflowID string
err = v.Get(&childWorkflowID)
ts.NoError(err)
ts.NotNil(childWorkflowID)
ts.NotEmpty(childWorkflowID)

err = ts.client.CancelWorkflow(context.Background(), childWorkflowID, "")
ts.NoError(err)

err = ts.client.CancelWorkflow(context.Background(), run.GetID(), "")
ts.NoError(err)

err = ts.client.SignalWorkflow(
context.Background(),
childWorkflowID,
"",
"unblock",
nil,
)
ts.NoError(err)

// Synchronously wait for the workflow completion. Behind the scenes the SDK performs a long poll operation.
// If you need to wait for the workflow completion from another process use
// Client.GetWorkflow API to get an instance of a WorkflowRun.
err = run.Get(context.Background(), nil)
ts.NoError(err)
}

func (ts *IntegrationTestSuite) TestCancelActivityImmediately() {
ts.T().Skip(`Currently fails with "PanicError": "unknown command internal.commandID{commandType:0, id:"5"}, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition`)
var expected []string
Expand Down
35 changes: 35 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,39 @@ func (w *Workflows) ChildWorkflowSuccessWithParentClosePolicyAbandon(ctx workflo
err = ft.GetChildWorkflowExecution().Get(ctx, &childWE)
return childWE.ID, err
}
func (w *Workflows) childWorkflowWaitOnSignal(ctx workflow.Context) error {
workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil)
return nil
}

func (w *Workflows) ChildWorkflowCancelUnusualTransitionsRepro(ctx workflow.Context) error {
var childWorkflowID string
err := workflow.SetQueryHandler(ctx, "child-workflow-id", func(input []byte) (string, error) {
return childWorkflowID, nil
})
if err != nil {
return err
}

cwo := workflow.ChildWorkflowOptions{WorkflowRunTimeout: time.Second * 2}
ctx = workflow.WithChildOptions(ctx, cwo)

childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, w.childWorkflowWaitOnSignal)

var childWorkflowExecution workflow.Execution
err = childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childWorkflowExecution)
if err != nil {
return err
}
childWorkflowID = childWorkflowExecution.ID

var result string
err = childWorkflowFuture.Get(ctx, &result)
if err != nil {
return err
}
return nil
}

func (w *Workflows) ActivityCancelRepro(ctx workflow.Context) ([]string, error) {
ctx, cancelFunc := workflow.WithCancel(ctx)
Expand Down Expand Up @@ -1087,6 +1120,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ChildWorkflowSuccess)
worker.RegisterWorkflow(w.ChildWorkflowSuccessWithParentClosePolicyTerminate)
worker.RegisterWorkflow(w.ChildWorkflowSuccessWithParentClosePolicyAbandon)
worker.RegisterWorkflow(w.ChildWorkflowCancelUnusualTransitionsRepro)
worker.RegisterWorkflow(w.ConsistentQueryWorkflow)
worker.RegisterWorkflow(w.ContextPropagator)
worker.RegisterWorkflow(w.ContinueAsNew)
Expand All @@ -1111,6 +1145,7 @@ func (w *Workflows) register(worker worker.Worker) {

worker.RegisterWorkflow(w.child)
worker.RegisterWorkflow(w.childForMemoAndSearchAttr)
worker.RegisterWorkflow(w.childWorkflowWaitOnSignal)
worker.RegisterWorkflow(w.sleep)
worker.RegisterWorkflow(w.timer)
}
Expand Down

0 comments on commit 75a589d

Please sign in to comment.