diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index c67d4ece1..3743c3dbc 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -217,7 +217,6 @@ func (eh *history) IsReplayEvent(event *historypb.HistoryEvent) bool { } func (eh *history) IsNextWorkflowTaskFailed() (isFailed bool, binaryChecksum string, err error) { - nextIndex := eh.currentIndex + 1 if nextIndex >= len(eh.loadedEvents) && eh.hasMoreEvents() { // current page ends and there is more pages if err := eh.loadMoreEvents(); err != nil { @@ -667,6 +666,19 @@ func (w *workflowExecutionContextImpl) resetStateIfDestroyed(task *workflowservi return err } } + if w.workflowInfo != nil { + // Reset the search attributes and memos from the WorkflowExecutionStartedEvent. + // The search attributes and memo may have been modified by calls like UpsertMemo + // or UpsertSearchAttributes. They must be reset to avoid non determinism on replay. + h := task.History + startedEvent := h.Events[0] + attributes := startedEvent.GetWorkflowExecutionStartedEventAttributes() + if attributes == nil { + return errors.New("first history event is not WorkflowExecutionStarted") + } + w.workflowInfo.SearchAttributes = attributes.SearchAttributes + w.workflowInfo.Memo = attributes.Memo + } } return nil } @@ -1156,8 +1168,8 @@ func (w *workflowExecutionContextImpl) ResetIfStale(task *workflowservice.PollWo tagCachedPreviousStartedEventID, w.previousStartedEventID, tagTaskFirstEventID, task.History.Events[0].GetEventId(), tagTaskStartedEventID, task.GetStartedEventId(), - tagPreviousStartedEventID, task.GetPreviousStartedEventId()) - + tagPreviousStartedEventID, task.GetPreviousStartedEventId(), + ) w.clearState() return w.resetStateIfDestroyed(task, historyIterator) } @@ -1185,7 +1197,7 @@ func skipDeterministicCheckForEvent(e *historypb.HistoryEvent) bool { if markerName == versionMarkerName || markerName == mutableSideEffectMarkerName { return true } - //case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED: + // case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED: // return true } return false @@ -1501,8 +1513,8 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow( task *workflowservice.PollWorkflowTaskQueueResponse, workflowContext *workflowExecutionContextImpl, commands []*commandpb.Command, - forceNewWorkflowTask bool) interface{} { - + forceNewWorkflowTask bool, +) interface{} { // for query task if task.Query != nil { queryCompletedRequest := &workflowservice.RespondQueryTaskCompletedRequest{ @@ -1613,8 +1625,8 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow( } func errorToFailWorkflowTask(taskToken []byte, err error, identity string, dataConverter converter.DataConverter, - failureConverter converter.FailureConverter, namespace string) *workflowservice.RespondWorkflowTaskFailedRequest { - + failureConverter converter.FailureConverter, namespace string, +) *workflowservice.RespondWorkflowTaskFailedRequest { cause := enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE // If it was a panic due to a bad state machine or if it was a history // mismatch error, mark as non-deterministic @@ -2002,7 +2014,8 @@ func createNewCommand(commandType enumspb.CommandType) *commandpb.Command { } func recordActivityHeartbeat(ctx context.Context, service workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, - identity string, taskToken []byte, details *commonpb.Payloads) error { + identity string, taskToken []byte, details *commonpb.Payloads, +) error { namespace := getNamespaceFromActivityCtx(ctx) request := &workflowservice.RecordActivityTaskHeartbeatRequest{ TaskToken: taskToken, @@ -2025,14 +2038,16 @@ func recordActivityHeartbeat(ctx context.Context, service workflowservice.Workfl } func recordActivityHeartbeatByID(ctx context.Context, service workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, - identity, namespace, workflowID, runID, activityID string, details *commonpb.Payloads) error { + identity, namespace, workflowID, runID, activityID string, details *commonpb.Payloads, +) error { request := &workflowservice.RecordActivityTaskHeartbeatByIdRequest{ Namespace: namespace, WorkflowId: workflowID, RunId: runID, ActivityId: activityID, Details: details, - Identity: identity} + Identity: identity, + } var heartbeatResponse *workflowservice.RecordActivityTaskHeartbeatByIdResponse grpcCtx, cancel := newGRPCContext(ctx, diff --git a/test/integration_test.go b/test/integration_test.go index f3023904b..ff56157c6 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -2324,6 +2324,68 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b ts.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR, taskFailed.Cause) } +func (ts *IntegrationTestSuite) TestDeterminismUpsertSearchAttributesConditional() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + maxTicks := 3 + options := ts.startWorkflowOptions("test-determinism-upsert-search-attributes-conidtional-" + uuid.New()) + options.SearchAttributes = map[string]interface{}{ + "CustomKeywordField": "unset", + } + run, err := ts.client.ExecuteWorkflow( + ctx, + options, + ts.workflows.UpsertSearchAttributesConditional, + maxTicks, + ) + ts.NoError(err) + + ts.testStaleCacheReplayDeterminism(ctx, run, maxTicks) +} + +func (ts *IntegrationTestSuite) TestDeterminismUpsertMemoConditional() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + maxTicks := 3 + options := ts.startWorkflowOptions("test-determinism-upsert-search-attributes-conidtional-" + uuid.New()) + options.Memo = map[string]interface{}{ + "TestMemo": "unset", + } + run, err := ts.client.ExecuteWorkflow( + ctx, + options, + ts.workflows.UpsertMemoConditional, + maxTicks, + ) + ts.NoError(err) + + ts.testStaleCacheReplayDeterminism(ctx, run, maxTicks) +} + +func (ts *IntegrationTestSuite) testStaleCacheReplayDeterminism(ctx context.Context, run client.WorkflowRun, maxTicks int) { + // clean up if test fails + defer func() { _ = ts.client.TerminateWorkflow(ctx, run.GetID(), run.GetRunID(), "", nil) }() + ts.waitForQueryTrue(run, "is-wait-tick-count", 1) + + ts.workerStopped = true + currentWorker := ts.worker + currentWorker.Stop() + for i := 0; i < maxTicks-1; i++ { + func() { + ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "tick", nil)) + currentWorker = worker.New(ts.client, ts.taskQueueName, worker.Options{}) + defer currentWorker.Stop() + ts.registerWorkflowsAndActivities(currentWorker) + ts.NoError(currentWorker.Start()) + ts.waitForQueryTrue(run, "is-wait-tick-count", 2+i) + }() + } + err := run.Get(ctx, nil) + ts.NoError(err) +} + func (ts *IntegrationTestSuite) TestClientGetNotFollowingRuns() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/test/workflow_test.go b/test/workflow_test.go index 624f2adba..4f455d8a4 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "log" "math/rand" "reflect" "strconv" @@ -78,7 +79,6 @@ func (w *Workflows) Deadlocked(ctx workflow.Context) ([]string, error) { var isDeadlockedWithLocalActivityFirstAttempt bool = true func (w *Workflows) DeadlockedWithLocalActivity(ctx workflow.Context) ([]string, error) { - laCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ ScheduleToCloseTimeout: 5 * time.Second, }) @@ -364,8 +364,8 @@ func (w *Workflows) IDReusePolicy( childWFID string, policy enumspb.WorkflowIdReusePolicy, parallel bool, - failFirstChild bool) (string, error) { - + failFirstChild bool, +) (string, error) { ctx = workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ WorkflowID: childWFID, WorkflowExecutionTimeout: 9 * time.Second, @@ -486,6 +486,7 @@ func (w *Workflows) ChildWorkflowSuccessWithParentClosePolicyAbandon(ctx workflo err = ft.GetChildWorkflowExecution().Get(ctx, &childWE) return childWE.ID, err } + func (w *Workflows) childWorkflowWaitOnSignal(ctx workflow.Context) error { workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil) return nil @@ -766,7 +767,6 @@ func (w *Workflows) LargeQueryResultWorkflow(ctx workflow.Context) (string, erro rand.Read(result) return result, nil }) - if err != nil { return "", errors.New("failed to register query handler") } @@ -1794,6 +1794,93 @@ func (w *Workflows) ForcedNonDeterminism(ctx workflow.Context, sameCommandButDif return } +func (w *Workflows) UpsertSearchAttributesConditional(ctx workflow.Context, maxTicks int) error { + var waitTickCount int + tickCh := workflow.GetSignalChannel(ctx, "tick") + err := workflow.SetQueryHandler( + ctx, + "is-wait-tick-count", + func(v int) (bool, error) { return waitTickCount == v, nil }, + ) + if err != nil { + return err + } + currentPayload, exists := workflow.GetInfo(ctx).SearchAttributes.GetIndexedFields()["CustomKeywordField"] + if !exists { + return errors.New("search attribute not present") + } + var searchAttr string + err = converter.GetDefaultDataConverter().FromPayload(currentPayload, &searchAttr) + log.Printf("Search attribute: %s. Replaying? %v.", searchAttr, workflow.IsReplaying(ctx)) + if err != nil { + return errors.New("error when get search attribute") + } + // Search attribute should always be "unset". + if searchAttr == "set" { + err = workflow.Sleep(ctx, 100*time.Millisecond) + } else if searchAttr == "unset" { + err = workflow.UpsertSearchAttributes(ctx, map[string]interface{}{"CustomKeywordField": "set"}) + } else { + return errors.New("unkown search attribute value") + } + if err != nil { + return err + } + // Now just wait for signals over and over + for { + waitTickCount++ + if waitTickCount >= maxTicks { + return nil + } + tickCh.Receive(ctx, nil) + log.Printf("Signal received (replaying? %v)", workflow.IsReplaying(ctx)) + } +} + +func (w *Workflows) UpsertMemoConditional(ctx workflow.Context, maxTicks int) error { + var waitTickCount int + tickCh := workflow.GetSignalChannel(ctx, "tick") + err := workflow.SetQueryHandler( + ctx, + "is-wait-tick-count", + func(v int) (bool, error) { return waitTickCount == v, nil }, + ) + if err != nil { + return err + } + // Get current memo value + currentPayload, ok := workflow.GetInfo(ctx).Memo.GetFields()["TestMemo"] + if !ok { + return errors.New("no memo value") + } + var memoValue string + err = converter.GetDefaultDataConverter().FromPayload(currentPayload, &memoValue) + if err != nil { + return err + } + // Memo should always be "unset". + log.Printf("Memo value %s, Replaying? %v.", memoValue, workflow.IsReplaying(ctx)) + if memoValue == "set" { + err = workflow.Sleep(ctx, 100*time.Millisecond) + } else if memoValue == "unset" { + err = workflow.UpsertMemo(ctx, map[string]interface{}{"TestMemo": "set"}) + } else { + return errors.New("memo unknown value") + } + if err != nil { + return err + } + // Now just wait for signals over and over + for { + waitTickCount++ + if waitTickCount >= maxTicks { + return nil + } + tickCh.Receive(ctx, nil) + log.Printf("Signal received (replaying? %v)", workflow.IsReplaying(ctx)) + } +} + func (w *Workflows) MutableSideEffect(ctx workflow.Context, startVal int) (currVal int, err error) { // Make some mutable side effect calls with timers in between sideEffector := func(retVal int) (newVal int, err error) { @@ -1860,7 +1947,7 @@ func (w *Workflows) HeartbeatSpecificCount(ctx workflow.Context, interval time.D func (w *Workflows) UpsertMemo(ctx workflow.Context, memo map[string]interface{}) (*commonpb.Memo, error) { err := workflow.UpsertMemo(ctx, memo) if err != nil { - return nil, err; + return nil, err } return workflow.GetInfo(ctx).Memo, nil } @@ -1900,6 +1987,8 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.ConsistentQueryWorkflow) worker.RegisterWorkflow(w.ContextPropagator) worker.RegisterWorkflow(w.ContinueAsNew) + worker.RegisterWorkflow(w.UpsertSearchAttributesConditional) + worker.RegisterWorkflow(w.UpsertMemoConditional) worker.RegisterWorkflow(w.ContinueAsNewWithOptions) worker.RegisterWorkflow(w.IDReusePolicy) worker.RegisterWorkflow(w.InspectActivityInfo)