From 34ef0a7d426e58ae09544098f3b7e1503173b21f Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 27 Feb 2024 08:27:04 -0800 Subject: [PATCH] Only set SDKPriorityUpdateHandling if workflow update is being used (#1398) Only set update flag on update --- internal/internal_event_handlers.go | 1 - internal/internal_task_handlers.go | 12 ++- internal/internal_task_pollers.go | 2 + internal/internal_workflow.go | 1 - internal/internal_workflow_client.go | 2 +- test/integration_test.go | 152 ++++++++++++++++++++++++++- test/workflow_test.go | 105 +++++++++++++++++- 7 files changed, 267 insertions(+), 8 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 3d3cb5bc3..91e87cfa5 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1360,7 +1360,6 @@ func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionStarted( // replay sees the _final_ value of applied flags, not intermediate values // as the value varies by WFT) weh.sdkFlags.tryUse(SDKFlagProtocolMessageCommand, !weh.isReplay) - weh.sdkFlags.tryUse(SDKPriorityUpdateHandling, !weh.isReplay) // Invoke the workflow. weh.workflowDefinition.Execute(weh, attributes.Header, attributes.Input) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 9c4a09575..1357f5c12 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -191,6 +191,10 @@ type ( message string } + unknownSdkFlagError struct { + message string + } + preparedTask struct { events []*historypb.HistoryEvent markers []*historypb.HistoryEvent @@ -239,6 +243,10 @@ func (h historyMismatchError) Error() string { return h.message } +func (s unknownSdkFlagError) Error() string { + return s.message +} + // Get workflow start event. func (eh *history) GetWorkflowStartedEvent() (*historypb.HistoryEvent, error) { events := eh.workflowTask.task.History.Events @@ -278,7 +286,9 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) { f := sdkFlagFromUint(flag) if !f.isValid() { // If a flag is not recognized (value is too high or not defined), it must fail the workflow task - return finishedTask{}, errors.New("could not recognize SDK flag") + return finishedTask{}, unknownSdkFlagError{ + message: fmt.Sprintf("unknown SDK flag: %d", flag), + } } flags = append(flags, f) } diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 07b368867..2fb68d2df 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -494,6 +494,8 @@ func (wtp *workflowTaskPoller) errorToFailWorkflowTask(taskToken []byte, err err } } else if _, mismatch := err.(historyMismatchError); mismatch { cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR + } else if _, unknown := err.(unknownSdkFlagError); unknown { + cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR } builtRequest := &workflowservice.RespondWorkflowTaskFailedRequest{ diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 7d6cd2189..42335ca2f 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -525,7 +525,6 @@ func (d *syncWorkflowDefinition) Execute(env WorkflowEnvironment, header *common state.yield("yield before executing to setup state") state.unblocked() - // TODO: @shreyassrivatsan - add workflow trace span here r.workflowResult, r.error = d.workflow.Execute(d.rootCtx, input) rpp := getWorkflowResultPointerPointer(ctx) *rpp = r diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 2afa5dcd7..a4d2ac45e 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -787,7 +787,7 @@ type UpdateWorkflowWithOptionsRequest struct { // WorkflowUpdateHandle is a handle to a workflow execution update process. The // update may or may not have completed so an instance of this type functions -// simlar to a Future with respect to the outcome of the update. If the update +// similar to a Future with respect to the outcome of the update. If the update // is rejected or returns an error, the Get function on this type will return // that error through the output valuePtr. // NOTE: Experimental diff --git a/test/integration_test.go b/test/integration_test.go index a1230137f..179509ea4 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -50,6 +50,7 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + updatepb "go.temporal.io/api/update/v1" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.uber.org/goleak" @@ -1460,7 +1461,7 @@ func (ts *IntegrationTestSuite) TestUpdateValidatorRejected() { run, err := ts.client.ExecuteWorkflow(ctx, wfOptions, ts.workflows.UpdateWithValidatorWorkflow) ts.Nil(err) - _, err = ts.client.QueryWorkflow(ctx, run.GetID(), run.GetRunID(), "__stack_trace") + _, err = ts.client.QueryWorkflow(ctx, run.GetID(), run.GetRunID(), client.QueryTypeStackTrace) ts.NoError(err) // Send a bad update request that will get rejected handler, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update", "") @@ -1472,6 +1473,37 @@ func (ts *IntegrationTestSuite) TestUpdateValidatorRejected() { ts.NoError(run.Get(ctx, nil)) } +func (ts *IntegrationTestSuite) TestUpdateWorkflowCancelled() { + ctx := context.Background() + wfOptions := ts.startWorkflowOptions("test-update-workflow-cancelled") + run, err := ts.client.ExecuteWorkflow(ctx, + wfOptions, ts.workflows.UpdateCancelableWorkflow) + ts.Nil(err) + + // Send a few updates to the workflow + handles := make([]client.WorkflowUpdateHandle, 0, 5) + for i := 0; i < 5; i++ { + handler, err := ts.client.UpdateWorkflowWithOptions(ctx, &client.UpdateWorkflowWithOptionsRequest{ + UpdateID: fmt.Sprintf("test-update-%d", i), + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + UpdateName: "update", + WaitPolicy: &updatepb.WaitPolicy{ + LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED, + }, + }) + ts.NoError(err) + handles = append(handles, handler) + } + // All updates should complete with a cancellation error + ts.NoError(ts.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID())) + for _, handle := range handles { + err = handle.Get(ctx, nil) + ts.NotNil(err.(*temporal.CanceledError)) + } + ts.NoError(run.Get(ctx, nil)) +} + func (ts *IntegrationTestSuite) TestBasicSession() { var expected []string err := ts.executeWorkflow("test-basic-session", ts.workflows.BasicSession, &expected) @@ -2620,6 +2652,74 @@ func (ts *IntegrationTestSuite) TestWaitOnUpdate() { ts.Equal(1, result) } +func (ts *IntegrationTestSuite) TestUpdateHandlerRegisteredLate() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-update-handler-registered-late") + run, err := ts.client.ExecuteWorkflow(ctx, + options, + ts.workflows.UpdateHandlerRegisteredLate) + ts.NoError(err) + // Wait for the workflow to be blocked + ts.waitForQueryTrue(run, "state", 0) + // Send an update before the handler is registered + updateHandle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update") + ts.NoError(err) + ts.Error(updateHandle.Get(ctx, nil)) + // Unblock the workflow so it can register the handler + ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "unblock", nil) + // Send an update after the handler is registered + updateHandle, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update") + ts.NoError(err) + ts.NoError(updateHandle.Get(ctx, nil)) + // Unblock the workflow so it can complete + ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "unblock", nil) + // Get the result + var result int + ts.NoError(run.Get(ctx, &result)) + ts.Equal(1, result) +} + +func (ts *IntegrationTestSuite) TestUpdateSDKFlag() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-update-SDK-flag") + run, err := ts.client.ExecuteWorkflow(ctx, + options, + ts.workflows.UpdateHandlerRegisteredLate) + ts.NoError(err) + // Wait for the workflow to be blocked + ts.waitForQueryTrue(run, "state", 0) + // Unblock the workflow so it can register the handler + ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "unblock", nil) + // Send an update after the handler is registered + updateHandle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update") + ts.NoError(err) + ts.NoError(updateHandle.Get(ctx, nil)) + // Unblock the workflow so it can complete + ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "unblock", nil) + // Get the result + var result int + ts.NoError(run.Get(ctx, &result)) + ts.Equal(1, result) + // Now test the SDK flag + iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + flagsSet := make([][]uint32, 0) + for iter.HasNext() { + event, err := iter.Next() + ts.NoError(err) + taskCompleted := event.GetWorkflowTaskCompletedEventAttributes() + if taskCompleted != nil { + flagsSet = append(flagsSet, taskCompleted.GetSdkMetadata().GetLangUsedFlags()) + } + } + priorityUpdateHandlingFlag := 4 + // The first workflow task should not have the flag set + ts.NotContains(flagsSet[0], priorityUpdateHandlingFlag) + // The second workflow task should have the flag set + ts.NotContains(flagsSet[1], priorityUpdateHandlingFlag) +} + func (ts *IntegrationTestSuite) TestUpdateOrdering() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -2715,6 +2815,56 @@ func (ts *IntegrationTestSuite) TestUpdateAlwaysHandled() { ts.Equal(1, result) } +func (ts *IntegrationTestSuite) TestUpdateRejected() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-update-rejected") + options.StartDelay = time.Hour + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.UpdateRejectedWithOtherGoRoutine) + ts.NoError(err) + // Send an update we expect to be rejected before the first workflow task + handle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update") + ts.NoError(err) + ts.Error(handle.Get(ctx, nil)) + ts.NoError(run.Get(ctx, nil)) +} + +func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInGoroutine() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-update-setting-handler-in-goroutine") + options.StartDelay = time.Hour + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.UpdateSettingHandlerInGoroutine) + ts.NoError(err) + // Send an update handler in a workflow goroutine, this should be accepted + handle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update") + ts.NoError(err) + ts.NoError(handle.Get(ctx, nil)) + ts.NoError(run.Get(ctx, nil)) +} + +func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInHandler() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-update-setting-handler-in-handler") + options.StartDelay = time.Hour + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.UpdateSettingHandlerInHandler) + ts.NoError(err) + // Expect this to fail because the handler is not set yet + handle, err := ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "inner update") + ts.NoError(err) + ts.Error(handle.Get(ctx, nil)) + // Send an update that should register a new handler for "inner update" + handle, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "update") + ts.NoError(err) + ts.NoError(handle.Get(ctx, nil)) + // Expect this to succeed because the handler is set now + handle, err = ts.client.UpdateWorkflow(ctx, run.GetID(), run.GetRunID(), "inner update") + ts.NoError(err) + ts.NoError(handle.Get(ctx, nil)) + ts.NoError(run.Get(ctx, nil)) +} + func (ts *IntegrationTestSuite) TestSessionOnWorkerFailure() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/test/workflow_test.go b/test/workflow_test.go index a8763cbda..5888ef769 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -304,6 +304,17 @@ func (w *Workflows) ActivityRetryOnHBTimeout(ctx workflow.Context) ([]string, er return []string{"heartbeatAndSleep", "heartbeatAndSleep", "heartbeatAndSleep"}, nil } +func (w *Workflows) UpdateCancelableWorkflow(ctx workflow.Context) error { + err := workflow.SetUpdateHandler(ctx, "update", func(ctx workflow.Context) error { + return workflow.Sleep(ctx, time.Hour) + }) + if err != nil { + return errors.New("failed to register update handler") + } + ctx.Done().Receive(ctx, nil) + return nil +} + func (w *Workflows) UpdateInfoWorkflow(ctx workflow.Context) error { err := workflow.SetUpdateHandlerWithOptions(ctx, "update", func(ctx workflow.Context) (string, error) { return workflow.GetUpdateInfo(ctx).ID, nil @@ -2393,27 +2404,31 @@ func (w *Workflows) WaitOnUpdate(ctx workflow.Context) (int, error) { updatesRan := 0 sleepHandle := func(ctx workflow.Context) error { inflightUpdates++ + defer func() { + inflightUpdates-- + }() updatesRan++ err := workflow.Sleep(ctx, time.Second) - inflightUpdates-- return err } echoHandle := func(ctx workflow.Context) error { inflightUpdates++ + defer func() { + inflightUpdates-- + }() updatesRan++ ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) var a Activities err := workflow.ExecuteActivity(ctx, a.Echo, 1, 1).Get(ctx, nil) - inflightUpdates-- return err } emptyHandle := func(ctx workflow.Context) error { inflightUpdates++ - updatesRan++ defer func() { inflightUpdates-- }() + updatesRan++ return ctx.Err() } // Register multiple update handles in the first workflow task to make sure we process an @@ -2438,6 +2453,46 @@ func (w *Workflows) UpdateSetHandlerOnly(ctx workflow.Context) (int, error) { return updatesRan, nil } +func (w *Workflows) UpdateHandlerRegisteredLate(ctx workflow.Context) (int, error) { + updatesRan := 0 + state := 0 + workflow.SetQueryHandler(ctx, "state", func(expectedState int) (bool, error) { + return state == expectedState, nil + }) + workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil) + state++ + updateHandle := func(ctx workflow.Context) error { + updatesRan++ + return nil + } + workflow.SetUpdateHandler(ctx, "update", updateHandle) + workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil) + state++ + return updatesRan, nil +} + +func (w *Workflows) UpdateRejectedWithOtherGoRoutine(ctx workflow.Context) error { + unblock := false + workflow.Go(ctx, func(ctx workflow.Context) { + workflow.Sleep(ctx, 100*time.Millisecond) + unblock = true + }) + + workflow.SetUpdateHandlerWithOptions(ctx, "update", + func(ctx workflow.Context) error { + return nil + }, workflow.UpdateHandlerOptions{ + Validator: func(ctx workflow.Context) error { + // Always reject the update + return errors.New("update rejected") + }, + }) + workflow.Await(ctx, func() bool { + return unblock + }) + return nil +} + func (w *Workflows) UpdateOrdering(ctx workflow.Context) (int, error) { updatesRan := 0 updateHandle := func(ctx workflow.Context) error { @@ -2455,6 +2510,45 @@ func (w *Workflows) UpdateOrdering(ctx workflow.Context) (int, error) { return updatesRan, nil } +func (w *Workflows) UpdateSettingHandlerInGoroutine(ctx workflow.Context) (int, error) { + updatesRan := 0 + inflightUpdates := 0 + wg := workflow.NewWaitGroup(ctx) + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + defer wg.Done() + workflow.SetUpdateHandler(ctx, "update", func(ctx workflow.Context) error { + updatesRan++ + inflightUpdates++ + defer func() { + inflightUpdates-- + }() + return workflow.Sleep(ctx, 100*time.Millisecond) + }) + }) + wg.Wait(ctx) + // Wait for no inflight updates + workflow.Await(ctx, func() bool { + return inflightUpdates == 0 + }) + return updatesRan, nil +} + +func (w *Workflows) UpdateSettingHandlerInHandler(ctx workflow.Context) (int, error) { + innerUpdatesRan := 0 + workflow.SetUpdateHandler(ctx, "update", func(ctx workflow.Context) error { + return workflow.SetUpdateHandler(ctx, "inner update", func(ctx workflow.Context) error { + innerUpdatesRan++ + return nil + }) + }) + currentTime := workflow.Now(ctx) + workflow.Await(ctx, func() bool { + return workflow.Now(ctx).After(currentTime) + }) + return innerUpdatesRan, nil +} + var forcedNonDeterminismCounter int func (w *Workflows) ForcedNonDeterminism(ctx workflow.Context, sameCommandButDiffName bool) (err error) { @@ -2880,6 +2974,11 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.VersionLoopWorkflow) worker.RegisterWorkflow(w.RaceOnCacheEviction) worker.RegisterWorkflow(w.UpdateWithValidatorWorkflow) + worker.RegisterWorkflow(w.UpdateRejectedWithOtherGoRoutine) + worker.RegisterWorkflow(w.UpdateSettingHandlerInGoroutine) + worker.RegisterWorkflow(w.UpdateSettingHandlerInHandler) + worker.RegisterWorkflow(w.UpdateCancelableWorkflow) + worker.RegisterWorkflow(w.UpdateHandlerRegisteredLate) worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childWithRetryPolicy)