diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index 4c79139121f..b959e5ed0fd 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -177,10 +177,17 @@ type WorkflowExecutionInfo struct { // Stamp represents the "version" of the workflow's internal state. // It increases monotonically when the workflow's options are modified. // It is used to check if a workflow task is still relevant to the corresponding workflow state machine. - WorkflowTaskStamp int32 `protobuf:"varint,109,opt,name=workflow_task_stamp,json=workflowTaskStamp,proto3" json:"workflow_task_stamp,omitempty"` - CancelRequested bool `protobuf:"varint,29,opt,name=cancel_requested,json=cancelRequested,proto3" json:"cancel_requested,omitempty"` - CancelRequestId string `protobuf:"bytes,32,opt,name=cancel_request_id,json=cancelRequestId,proto3" json:"cancel_request_id,omitempty"` - StickyTaskQueue string `protobuf:"bytes,33,opt,name=sticky_task_queue,json=stickyTaskQueue,proto3" json:"sticky_task_queue,omitempty"` + WorkflowTaskStamp int32 `protobuf:"varint,109,opt,name=workflow_task_stamp,json=workflowTaskStamp,proto3" json:"workflow_task_stamp,omitempty"` + // AttemptsSinceLastSuccess tracks the number of workflow task attempts since the last successful workflow task. + // This is carried over when buffered events are applied after workflow task failures. + // Used by the TemporalReportedProblems search attribute to track continuous failure count. + // (-- api-linter: core::0140::prepositions=disabled + // + // aip.dev/not-precedent: "since" is needed here. --) + WorkflowTaskAttemptsSinceLastSuccess int32 `protobuf:"varint,110,opt,name=workflow_task_attempts_since_last_success,json=workflowTaskAttemptsSinceLastSuccess,proto3" json:"workflow_task_attempts_since_last_success,omitempty"` + CancelRequested bool `protobuf:"varint,29,opt,name=cancel_requested,json=cancelRequested,proto3" json:"cancel_requested,omitempty"` + CancelRequestId string `protobuf:"bytes,32,opt,name=cancel_request_id,json=cancelRequestId,proto3" json:"cancel_request_id,omitempty"` + StickyTaskQueue string `protobuf:"bytes,33,opt,name=sticky_task_queue,json=stickyTaskQueue,proto3" json:"sticky_task_queue,omitempty"` // (-- api-linter: core::0140::prepositions=disabled // // aip.dev/not-precedent: "to" is used to indicate interval. --) @@ -602,6 +609,13 @@ func (x *WorkflowExecutionInfo) GetWorkflowTaskStamp() int32 { return 0 } +func (x *WorkflowExecutionInfo) GetWorkflowTaskAttemptsSinceLastSuccess() int32 { + if x != nil { + return x.WorkflowTaskAttemptsSinceLastSuccess + } + return 0 +} + func (x *WorkflowExecutionInfo) GetCancelRequested() bool { if x != nil { return x.CancelRequested @@ -4497,7 +4511,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x03key\x18\x01 \x01(\x05R\x03key\x12D\n" + "\x05value\x18\x02 \x01(\v2..temporal.server.api.persistence.v1.QueueStateR\x05value:\x028\x01J\x04\b\x04\x10\x05J\x04\b\x05\x10\x06J\x04\b\b\x10\tJ\x04\b\t\x10\n" + "J\x04\b\n" + - "\x10\vJ\x04\b\v\x10\fJ\x04\b\f\x10\rJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11\"\xfe=\n" + + "\x10\vJ\x04\b\v\x10\fJ\x04\b\f\x10\rJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11\"\xd7>\n" + "\x15WorkflowExecutionInfo\x12!\n" + "\fnamespace_id\x18\x01 \x01(\tR\vnamespaceId\x12\x1f\n" + "\vworkflow_id\x18\x02 \x01(\tR\n" + @@ -4534,7 +4548,8 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + " workflow_task_history_size_bytes\x18F \x01(\x03R\x1cworkflowTaskHistorySizeBytes\x123\n" + "\x16workflow_task_build_id\x18X \x01(\tR\x13workflowTaskBuildId\x12S\n" + "'workflow_task_build_id_redirect_counter\x18Y \x01(\x03R\"workflowTaskBuildIdRedirectCounter\x12.\n" + - "\x13workflow_task_stamp\x18m \x01(\x05R\x11workflowTaskStamp\x12)\n" + + "\x13workflow_task_stamp\x18m \x01(\x05R\x11workflowTaskStamp\x12W\n" + + ")workflow_task_attempts_since_last_success\x18n \x01(\x05R$workflowTaskAttemptsSinceLastSuccess\x12)\n" + "\x10cancel_requested\x18\x1d \x01(\bR\x0fcancelRequested\x12*\n" + "\x11cancel_request_id\x18 \x01(\tR\x0fcancelRequestId\x12*\n" + "\x11sticky_task_queue\x18! \x01(\tR\x0fstickyTaskQueue\x12a\n" + diff --git a/chasm/lib/callback/executors_test.go b/chasm/lib/callback/executors_test.go index 2c95d4a8160..9bb856b3df2 100644 --- a/chasm/lib/callback/executors_test.go +++ b/chasm/lib/callback/executors_test.go @@ -184,7 +184,7 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { } chasmRegistry := chasm.NewRegistry(logger) - err := chasmRegistry.Register(&Library{ + err = chasmRegistry.Register(&Library{ InvocationTaskExecutor: executor, }) require.NoError(t, err) @@ -587,7 +587,7 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) { } chasmRegistry := chasm.NewRegistry(logger) - err := chasmRegistry.Register(&Library{ + err = chasmRegistry.Register(&Library{ InvocationTaskExecutor: executor, }) require.NoError(t, err) diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 444599d5cdb..611558a16e1 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -104,6 +104,12 @@ message WorkflowExecutionInfo { // It increases monotonically when the workflow's options are modified. // It is used to check if a workflow task is still relevant to the corresponding workflow state machine. int32 workflow_task_stamp = 109; + // AttemptsSinceLastSuccess tracks the number of workflow task attempts since the last successful workflow task. + // This is carried over when buffered events are applied after workflow task failures. + // Used by the TemporalReportedProblems search attribute to track continuous failure count. + // (-- api-linter: core::0140::prepositions=disabled + // aip.dev/not-precedent: "since" is needed here. --) + int32 workflow_task_attempts_since_last_success = 110; bool cancel_requested = 29; string cancel_request_id = 32; diff --git a/service/history/interfaces/workflow_task_info.go b/service/history/interfaces/workflow_task_info.go index 781d82b8d77..27a5e361899 100644 --- a/service/history/interfaces/workflow_task_info.go +++ b/service/history/interfaces/workflow_task_info.go @@ -17,7 +17,16 @@ type WorkflowTaskInfo struct { WorkflowTaskTimeout time.Duration // This is only needed to communicate task queue used after AddWorkflowTaskScheduledEvent. TaskQueue *taskqueuepb.TaskQueue - Attempt int32 + + // Attempt is the number of attempts for this workflow task. + Attempt int32 + // AttemptsSinceLastSuccess is the number of attempts since the last successful workflow task. + // This is used by the `TemporalReportedProblems` search attribute to check latest WFT failure count, + // this will only differ from attempts above when the previous workflow tasks failed and there was a + // buffered event (like a signal or activity finishing) applied to the workflow which causes the + // new workflow task to have an attempt of 1 again. + AttemptsSinceLastSuccess int32 + // Scheduled and Started timestamps are useful for transient workflow task: when transient workflow task finally completes, // use these Timestamp to create scheduled/started events. // Also used for recording latency metrics diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index a8839b3e8c2..5144904c6aa 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -8444,15 +8444,16 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx var workflowTaskVersionUpdated bool if transitionhistory.Compare(current.WorkflowTaskLastUpdateVersionedTransition, incoming.WorkflowTaskLastUpdateVersionedTransition) != 0 { ms.workflowTaskManager.UpdateWorkflowTask(&historyi.WorkflowTaskInfo{ - Version: incoming.WorkflowTaskVersion, - ScheduledEventID: incoming.WorkflowTaskScheduledEventId, - StartedEventID: incoming.WorkflowTaskStartedEventId, - RequestID: incoming.WorkflowTaskRequestId, - WorkflowTaskTimeout: incoming.WorkflowTaskTimeout.AsDuration(), - Attempt: incoming.WorkflowTaskAttempt, - Stamp: incoming.WorkflowTaskStamp, - StartedTime: incoming.WorkflowTaskStartedTime.AsTime(), - ScheduledTime: incoming.WorkflowTaskScheduledTime.AsTime(), + Version: incoming.WorkflowTaskVersion, + ScheduledEventID: incoming.WorkflowTaskScheduledEventId, + StartedEventID: incoming.WorkflowTaskStartedEventId, + RequestID: incoming.WorkflowTaskRequestId, + WorkflowTaskTimeout: incoming.WorkflowTaskTimeout.AsDuration(), + Attempt: incoming.WorkflowTaskAttempt, + AttemptsSinceLastSuccess: incoming.WorkflowTaskAttemptsSinceLastSuccess, + Stamp: incoming.WorkflowTaskStamp, + StartedTime: incoming.WorkflowTaskStartedTime.AsTime(), + ScheduledTime: incoming.WorkflowTaskScheduledTime.AsTime(), OriginalScheduledTime: incoming.WorkflowTaskOriginalScheduledTime.AsTime(), Type: incoming.WorkflowTaskType, diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index bbe602f70ef..023cb5b9ebe 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -81,20 +81,21 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskScheduledEvent( } workflowTask := &historyi.WorkflowTaskInfo{ - Version: version, - ScheduledEventID: scheduledEventID, - StartedEventID: common.EmptyEventID, - RequestID: emptyUUID, - WorkflowTaskTimeout: startToCloseTimeout.AsDuration(), - TaskQueue: taskQueue, - Attempt: attempt, - ScheduledTime: scheduledTime.AsTime(), - StartedTime: time.Time{}, - OriginalScheduledTime: originalScheduledTimestamp.AsTime(), - Type: workflowTaskType, - SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started - HistorySizeBytes: 0, // reset, will be recomputed on workflow task started - Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), + Version: version, + ScheduledEventID: scheduledEventID, + StartedEventID: common.EmptyEventID, + RequestID: emptyUUID, + WorkflowTaskTimeout: startToCloseTimeout.AsDuration(), + TaskQueue: taskQueue, + Attempt: attempt, + AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess, + ScheduledTime: scheduledTime.AsTime(), + StartedTime: time.Time{}, + OriginalScheduledTime: originalScheduledTimestamp.AsTime(), + Type: workflowTaskType, + SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started + HistorySizeBytes: 0, // reset, will be recomputed on workflow task started + Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), } m.retainWorkflowTaskBuildIdInfo(workflowTask) @@ -145,14 +146,15 @@ func (m *workflowTaskStateMachine) ApplyTransientWorkflowTaskScheduled() (*histo WorkflowTaskTimeout: m.ms.GetExecutionInfo().DefaultWorkflowTaskTimeout.AsDuration(), // Task queue is always normal (not sticky) because transient workflow task is created only for // failed/timed out workflow task and fail/timeout clears sticky task queue. - TaskQueue: m.ms.CurrentTaskQueue(), - Attempt: m.ms.GetExecutionInfo().WorkflowTaskAttempt, - ScheduledTime: timestamppb.New(m.ms.timeSource.Now()).AsTime(), - StartedTime: time.Unix(0, 0).UTC(), - Type: enumsspb.WORKFLOW_TASK_TYPE_NORMAL, - SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started - HistorySizeBytes: 0, // reset, will be recomputed on workflow task started - Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), + TaskQueue: m.ms.CurrentTaskQueue(), + Attempt: m.ms.GetExecutionInfo().WorkflowTaskAttempt, + AttemptsSinceLastSuccess: m.ms.GetExecutionInfo().WorkflowTaskAttemptsSinceLastSuccess, + ScheduledTime: timestamppb.New(m.ms.timeSource.Now()).AsTime(), + StartedTime: time.Unix(0, 0).UTC(), + Type: enumsspb.WORKFLOW_TASK_TYPE_NORMAL, + SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started + HistorySizeBytes: 0, // reset, will be recomputed on workflow task started + Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), } m.retainWorkflowTaskBuildIdInfo(workflowTask) @@ -195,21 +197,22 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskStartedEvent( } workflowTask = &historyi.WorkflowTaskInfo{ - Version: version, - ScheduledEventID: scheduledEventID, - StartedEventID: startedEventID, - RequestID: requestID, - WorkflowTaskTimeout: workflowTask.WorkflowTaskTimeout, - Attempt: workflowTask.Attempt, - StartedTime: startedTime, - ScheduledTime: workflowTask.ScheduledTime, - TaskQueue: workflowTask.TaskQueue, - OriginalScheduledTime: workflowTask.OriginalScheduledTime, - Type: workflowTask.Type, - SuggestContinueAsNew: suggestContinueAsNew, - HistorySizeBytes: historySizeBytes, - BuildIdRedirectCounter: redirectCounter, - Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), + Version: version, + ScheduledEventID: scheduledEventID, + StartedEventID: startedEventID, + RequestID: requestID, + WorkflowTaskTimeout: workflowTask.WorkflowTaskTimeout, + Attempt: workflowTask.Attempt, + AttemptsSinceLastSuccess: workflowTask.AttemptsSinceLastSuccess, + StartedTime: startedTime, + ScheduledTime: workflowTask.ScheduledTime, + TaskQueue: workflowTask.TaskQueue, + OriginalScheduledTime: workflowTask.OriginalScheduledTime, + Type: workflowTask.Type, + SuggestContinueAsNew: suggestContinueAsNew, + HistorySizeBytes: historySizeBytes, + BuildIdRedirectCounter: redirectCounter, + Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), } if buildId := worker_versioning.BuildIdIfUsingVersioning(versioningStamp); buildId != "" { @@ -924,34 +927,40 @@ func (m *workflowTaskStateMachine) failWorkflowTask( m.ms.ClearStickyTaskQueue() } - failWorkflowTaskInfo := &historyi.WorkflowTaskInfo{ - Version: common.EmptyVersion, - ScheduledEventID: common.EmptyEventID, - StartedEventID: common.EmptyEventID, - RequestID: emptyUUID, - WorkflowTaskTimeout: time.Duration(0), - StartedTime: time.Unix(0, 0).UTC(), - TaskQueue: nil, - OriginalScheduledTime: time.Unix(0, 0).UTC(), - Attempt: 1, - Type: enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED, - SuggestContinueAsNew: false, - HistorySizeBytes: 0, - // need to retain Build ID of failed WF task to compare it with the build ID of next attempt - BuildId: m.ms.executionInfo.WorkflowTaskBuildId, - } + newAttemptsSinceLastSuccess := m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + newAttempt := int32(1) if incrementAttempt { - failWorkflowTaskInfo.Attempt = m.ms.executionInfo.WorkflowTaskAttempt + 1 - failWorkflowTaskInfo.ScheduledTime = m.ms.timeSource.Now().UTC() + // Increment the failure counter for this WFT failure + newAttemptsSinceLastSuccess = m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + 1 + // Also increment Attempt for transient workflow task tracking + newAttempt = m.ms.executionInfo.WorkflowTaskAttempt + 1 if m.ms.config.EnableWorkflowTaskStampIncrementOnFailure() { m.ms.executionInfo.WorkflowTaskStamp += 1 } } + + failWorkflowTaskInfo := &historyi.WorkflowTaskInfo{ + Version: common.EmptyVersion, + ScheduledEventID: common.EmptyEventID, + StartedEventID: common.EmptyEventID, + RequestID: emptyUUID, + WorkflowTaskTimeout: time.Duration(0), + StartedTime: time.Unix(0, 0).UTC(), + TaskQueue: nil, + OriginalScheduledTime: time.Unix(0, 0).UTC(), + Attempt: newAttempt, + AttemptsSinceLastSuccess: newAttemptsSinceLastSuccess, + Type: enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED, + SuggestContinueAsNew: false, + HistorySizeBytes: 0, + // need to retain Build ID of failed WF task to compare it with the build ID of next attempt + BuildId: m.ms.executionInfo.WorkflowTaskBuildId, + } m.retainWorkflowTaskBuildIdInfo(failWorkflowTaskInfo) m.UpdateWorkflowTask(failWorkflowTaskInfo) consecutiveFailuresRequired := m.ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(m.ms.GetNamespaceEntry().Name().String()) - if consecutiveFailuresRequired > 0 && failWorkflowTaskInfo.Attempt >= int32(consecutiveFailuresRequired) { + if consecutiveFailuresRequired > 0 && failWorkflowTaskInfo.AttemptsSinceLastSuccess >= int32(consecutiveFailuresRequired) { if err := m.ms.UpdateReportedProblemsSearchAttribute(); err != nil { return err } @@ -1016,6 +1025,7 @@ func (m *workflowTaskStateMachine) UpdateWorkflowTask( m.ms.executionInfo.WorkflowTaskRequestId = workflowTask.RequestID m.ms.executionInfo.WorkflowTaskTimeout = durationpb.New(workflowTask.WorkflowTaskTimeout) m.ms.executionInfo.WorkflowTaskAttempt = workflowTask.Attempt + m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = workflowTask.AttemptsSinceLastSuccess if !workflowTask.StartedTime.IsZero() { m.ms.executionInfo.WorkflowTaskStartedTime = timestamppb.New(workflowTask.StartedTime) } @@ -1141,6 +1151,7 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *historyi.WorkflowTaskI RequestID: m.ms.executionInfo.WorkflowTaskRequestId, WorkflowTaskTimeout: m.ms.executionInfo.WorkflowTaskTimeout.AsDuration(), Attempt: m.ms.executionInfo.WorkflowTaskAttempt, + AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess, StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime.AsTime(), ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime.AsTime(), TaskQueue: m.ms.CurrentTaskQueue(), diff --git a/tests/workflow_task_reported_problems_test.go b/tests/workflow_task_reported_problems_test.go index 0646df34ff0..d1c968f6ea1 100644 --- a/tests/workflow_task_reported_problems_test.go +++ b/tests/workflow_task_reported_problems_test.go @@ -19,9 +19,7 @@ import ( type WFTFailureReportedProblemsTestSuite struct { testcore.FunctionalTestBase - shouldFail atomic.Bool - failureCount atomic.Int32 - failureType atomic.Int32 // 0 = panic, 1 = non-deterministic error + shouldFail atomic.Bool } func TestWFTFailureReportedProblemsTestSuite(t *testing.T) { @@ -45,6 +43,25 @@ func (s *WFTFailureReportedProblemsTestSuite) simpleActivity() (string, error) { return "done!", nil } +// workflowWithSignalsThatFails creates a workflow that listens for signals and fails on each workflow task. +// This is used to test that the TemporalReportedProblems search attribute is not incorrectly removed +// when signals keep coming in despite continuous workflow task failures. +func (s *WFTFailureReportedProblemsTestSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) { + // If we should fail, signal ourselves (creating a side effect) and immediately panic. + // This will create buffered. + if s.shouldFail.Load() { + // Signal ourselves to create buffered events + err := s.SdkClient().SignalWorkflow(context.Background(), workflow.GetInfo(ctx).WorkflowExecution.ID, "", "test-signal", "self-signal") + if err != nil { + return "", err + } + panic("forced-panic-after-self-signal") + } + + // If we reach here, shouldFail is false, so we can complete + return "done!", nil +} + // workflowWithActivity creates a workflow that executes an activity before potentially failing. // This is used to test workflow task failure scenarios in a more realistic context where the workflow // has already executed some operations (activities) before encountering a workflow task failure. @@ -95,7 +112,7 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Set execution, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) require.NoError(t, err) require.GreaterOrEqual(t, execution.PendingWorkflowTask.Attempt, int32(2)) - }, 5*time.Second, 500*time.Millisecond) + }, 20*time.Second, 500*time.Millisecond) // Unblock the workflow s.shouldFail.Store(false) @@ -110,6 +127,49 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Set s.False(ok) } +func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_NotClearedBySignals() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s.shouldFail.Store(true) + + s.Worker().RegisterWorkflow(s.workflowWithSignalsThatFails) + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: s.TaskQueue(), + } + + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, s.workflowWithSignalsThatFails) + s.NoError(err) + + // The workflow will signal itself and panic on each WFT, creating buffered events naturally. + // Wait for the search attribute to be set due to consecutive failures + s.EventuallyWithT(func(t *assert.CollectT) { + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + require.True(t, ok) + require.NotEmpty(t, saVal) + require.Contains(t, saVal, "category=WorkflowTaskFailed") + require.Contains(t, saVal, "cause=WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure") + }, 20*time.Second, 500*time.Millisecond) + + // Verify the search attribute persists even as the workflow continues to fail and create buffered events + // This is the key part of the test - buffered events should not cause the search attribute to be cleared + s.EventuallyWithT(func(t *assert.CollectT) { + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + s.True(ok, "Search attribute should still be present during continued failures") + s.NotEmpty(saVal, "Search attribute should not be empty during continued failures") + }, 5*time.Second, 500*time.Millisecond) + + // Terminate the workflow for cleanup + err = s.SdkClient().TerminateWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test cleanup") + s.NoError(err) +} + func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_SetAndClear_FailAfterActivity() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -137,7 +197,7 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Set require.Len(t, saValues, 2) require.Contains(t, saValues, "category=WorkflowTaskFailed") require.Contains(t, saValues, "cause=WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure") - }, 5*time.Second, 500*time.Millisecond) + }, 20*time.Second, 500*time.Millisecond) // Unblock the workflow s.shouldFail.Store(false) diff --git a/tests/xdc/workflow_task_reported_problems_test.go b/tests/xdc/workflow_task_reported_problems_test.go index fbedbf3c20f..821fd922f21 100644 --- a/tests/xdc/workflow_task_reported_problems_test.go +++ b/tests/xdc/workflow_task_reported_problems_test.go @@ -27,8 +27,9 @@ import ( type ( WorkflowTaskReportedProblemsReplicationSuite struct { xdcBaseSuite - shouldFail atomic.Bool - failureCount atomic.Int32 + shouldFail atomic.Bool + activeSDKClient sdkclient.Client + standbySDKClient sdkclient.Client } ) @@ -203,7 +204,8 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro defer cancel() ns := s.createGlobalNamespace() - activeSDKClient, err := sdkclient.Dial(sdkclient.Options{ + var err error + s.activeSDKClient, err = sdkclient.Dial(sdkclient.Options{ HostPort: s.clusters[0].Host().FrontendGRPCAddress(), Namespace: ns, Logger: log.NewSdkLogger(s.logger), @@ -211,7 +213,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro s.NoError(err) taskQueue := testcore.RandomizeStr("tq") - worker1 := sdkworker.New(activeSDKClient, taskQueue, sdkworker.Options{}) + worker1 := sdkworker.New(s.activeSDKClient, taskQueue, sdkworker.Options{}) worker1.RegisterWorkflow(s.simpleWorkflow) @@ -228,12 +230,12 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro TaskQueue: taskQueue, } - workflowRun, err := activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.simpleWorkflow) + workflowRun, err := s.activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.simpleWorkflow) s.NoError(err) // Verify search attributes are NOT set in cluster0 when config is 0 s.EventuallyWithT(func(t *assert.CollectT) { - description, err := activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + description, err := s.activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) require.NoError(t, err) _, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) require.False(t, ok) @@ -241,7 +243,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro // Verify workflow task attempts are accumulating s.EventuallyWithT(func(t *assert.CollectT) { - exec, err := activeSDKClient.DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + exec, err := s.activeSDKClient.DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) require.NoError(t, err) require.GreaterOrEqual(t, exec.PendingWorkflowTask.Attempt, int32(2)) }, 5*time.Second, 500*time.Millisecond) @@ -252,7 +254,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro // Verify search attributes are now set in cluster0 s.EventuallyWithT(func(t *assert.CollectT) { - description, err := activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + description, err := s.activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) require.NoError(t, err) saValues, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) require.True(t, ok) @@ -263,17 +265,17 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro }, 15*time.Second, 500*time.Millisecond) // get standby client - standbyClient, err := sdkclient.Dial(sdkclient.Options{ + s.standbySDKClient, err = sdkclient.Dial(sdkclient.Options{ HostPort: s.clusters[1].Host().FrontendGRPCAddress(), Namespace: ns, }) s.NoError(err) - s.NotNil(standbyClient) + s.NotNil(s.standbySDKClient) // verify search attributes are replicated to cluster1 using the helper function s.checkReportedProblemsSearchAttribute( s.clusters[1].Host().AdminClient(), - standbyClient, + s.standbySDKClient, workflowRun.GetID(), workflowRun.GetRunID(), ns, @@ -292,7 +294,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro // verify search attributes are cleared in cluster1 using the helper function s.checkReportedProblemsSearchAttribute( s.clusters[1].Host().AdminClient(), - standbyClient, + s.standbySDKClient, workflowRun.GetID(), workflowRun.GetRunID(), ns, @@ -301,3 +303,95 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro false, ) } + +// workflowWithSignalsThatFails creates a workflow that listens for signals and fails on each workflow task. +func (s *WorkflowTaskReportedProblemsReplicationSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) { // If we should fail, signal ourselves (creating a side effect) and immediately panic. + // This will create buffered. + if s.shouldFail.Load() { + // Signal ourselves to create buffered events + err := s.activeSDKClient.SignalWorkflow(context.Background(), workflow.GetInfo(ctx).WorkflowExecution.ID, "", "test-signal", "self-signal") + if err != nil { + return "", err + } + panic("forced-panic-after-self-signal") + } + + // If we reach here, shouldFail is false, so we can complete + return "done!", nil +} + +func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedProblems_NotClearedBySignals() { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + ns := s.createGlobalNamespace() + var err error + s.activeSDKClient, err = sdkclient.Dial(sdkclient.Options{ + HostPort: s.clusters[0].Host().FrontendGRPCAddress(), + Namespace: ns, + Logger: log.NewSdkLogger(s.logger), + }) + s.NoError(err) + + s.shouldFail.Store(true) + + taskQueue := testcore.RandomizeStr("tq") + worker1 := sdkworker.New(s.activeSDKClient, taskQueue, sdkworker.Options{}) + worker1.RegisterWorkflow(s.workflowWithSignalsThatFails) + s.NoError(worker1.Start()) + defer worker1.Stop() + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wfid-" + s.T().Name()), + TaskQueue: taskQueue, + } + + workflowRun, err := s.activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.workflowWithSignalsThatFails) + s.NoError(err) + + // The workflow will signal itself and panic on each WFT, creating buffered events naturally. + // Wait for the search attribute to be set due to consecutive failures + s.checkReportedProblemsSearchAttribute( + s.clusters[0].Host().AdminClient(), + s.activeSDKClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "WorkflowTaskFailed", + "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", + true, + ) + + // Verify the search attribute persists even as the workflow continues to fail and create buffered events + s.EventuallyWithT(func(t *assert.CollectT) { + description, err := s.activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + s.True(ok, "Search attribute should still be present during continued failures") + s.NotEmpty(saVal, "Search attribute should not be empty during continued failures") + }, 5*time.Second, 500*time.Millisecond) + + // get standby client + s.standbySDKClient, err = sdkclient.Dial(sdkclient.Options{ + HostPort: s.clusters[1].Host().FrontendGRPCAddress(), + Namespace: ns, + }) + s.NoError(err) + s.NotNil(s.standbySDKClient) + + // verify search attributes are replicated to cluster1 + s.checkReportedProblemsSearchAttribute( + s.clusters[1].Host().AdminClient(), + s.standbySDKClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "WorkflowTaskFailed", + "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", + true, + ) + + // Terminate the workflow for cleanup + err = s.activeSDKClient.TerminateWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test cleanup") + s.NoError(err) +}