From aa6df49126d6470346394a569d255edab8be5dcd Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Fri, 5 Dec 2025 17:08:27 -0700 Subject: [PATCH 01/15] Preserve workflow task attempt when applying buffered events --- .../workflow/workflow_task_state_machine.go | 25 +++- tests/workflow_task_reported_problems_test.go | 110 +++++++++++++++++- 2 files changed, 129 insertions(+), 6 deletions(-) diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index bbe602f70ef..fc9ff2e0b31 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -318,8 +318,17 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( // If while scheduling a workflow task and new events has come, then this workflow task cannot be a transient/speculative. // Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for // transient/speculative workflow task and will cause in timeout processing to not work for transient workflow tasks. + preservedAttemptDueToBufferedEvents := false if m.ms.HasBufferedEvents() { - m.ms.executionInfo.WorkflowTaskAttempt = 1 + // Only reset attempt if we're NOT in a transient workflow task (i.e., not continuously failing). + // When workflow tasks are failing continuously and signals arrive, we should preserve the attempt + // count to properly trigger the TemporalReportedProblems search attribute. + wasTransient := m.ms.IsTransientWorkflowTask() + if !wasTransient { + m.ms.executionInfo.WorkflowTaskAttempt = 1 + } else { + preservedAttemptDueToBufferedEvents = true + } workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL createWorkflowTaskScheduledEvent = true m.ms.updatePendingEventIDs(m.ms.hBuilder.FlushBufferToCurrentBatch()) @@ -335,10 +344,14 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( // If failover happened during transient workflow task, // then reset the attempt to 1, and not use transient workflow task. + // However, if we just preserved the attempt due to buffered events during failures, + // don't reset it here even if versions differ (the version change is from flushing buffered events). if m.ms.GetCurrentVersion() != lastEventVersion { - m.ms.executionInfo.WorkflowTaskAttempt = 1 - workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL - createWorkflowTaskScheduledEvent = true + if !preservedAttemptDueToBufferedEvents { + m.ms.executionInfo.WorkflowTaskAttempt = 1 + workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL + createWorkflowTaskScheduledEvent = true + } } } @@ -490,7 +503,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( if !workflowTaskScheduledEventCreated && (workflowTask.ScheduledEventID != m.ms.GetNextEventID() || workflowTask.Version != m.ms.GetCurrentVersion()) { - workflowTask.Attempt = 1 + // Only reset attempt to 1 if this is NOT a transient workflow task (attempt was 1). + // If workflow tasks are continuously failing (attempt > 1) and new events arrive (e.g., buffered signals), + // preserve the attempt count to properly trigger the TemporalReportedProblems search attribute. workflowTask.Type = enumsspb.WORKFLOW_TASK_TYPE_NORMAL workflowTaskScheduledEventCreated = true scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( diff --git a/tests/workflow_task_reported_problems_test.go b/tests/workflow_task_reported_problems_test.go index 0646df34ff0..b4320417a77 100644 --- a/tests/workflow_task_reported_problems_test.go +++ b/tests/workflow_task_reported_problems_test.go @@ -22,6 +22,7 @@ type WFTFailureReportedProblemsTestSuite struct { shouldFail atomic.Bool failureCount atomic.Int32 failureType atomic.Int32 // 0 = panic, 1 = non-deterministic error + stopSignals atomic.Bool } func TestWFTFailureReportedProblemsTestSuite(t *testing.T) { @@ -31,7 +32,7 @@ func TestWFTFailureReportedProblemsTestSuite(t *testing.T) { func (s *WFTFailureReportedProblemsTestSuite) SetupTest() { s.FunctionalTestBase.SetupTest() - s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute, 2) + s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute, 4) } func (s *WFTFailureReportedProblemsTestSuite) simpleWorkflowWithShouldFail(ctx workflow.Context) (string, error) { @@ -45,6 +46,31 @@ func (s *WFTFailureReportedProblemsTestSuite) simpleActivity() (string, error) { return "done!", nil } +// workflowWithSignalsThatFails creates a workflow that listens for signals and fails on each workflow task. +// This is used to test that the TemporalReportedProblems search attribute is not incorrectly removed +// when signals keep coming in despite continuous workflow task failures. +func (s *WFTFailureReportedProblemsTestSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) { + signalChan := workflow.GetSignalChannel(ctx, "test-signal") + + for { + var signalValue string + more := signalChan.Receive(ctx, &signalValue) + if !more { + break + } + + // Always fail after receiving a signal + if s.shouldFail.Load() { + panic("forced-panic-after-signal") + } + + // If we reach here, shouldFail is false, so we can complete + return "done!", nil + } + + return "done!", nil +} + // workflowWithActivity creates a workflow that executes an activity before potentially failing. // This is used to test workflow task failure scenarios in a more realistic context where the workflow // has already executed some operations (activities) before encountering a workflow task failure. @@ -110,6 +136,88 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Set s.False(ok) } +func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_NotClearedBySignals() { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + s.shouldFail.Store(true) + s.stopSignals.Store(false) + + s.Worker().RegisterWorkflow(s.workflowWithSignalsThatFails) + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: s.TaskQueue(), + } + + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, s.workflowWithSignalsThatFails) + s.NoError(err) + + // Start sending signals every second in a goroutine + signalDone := make(chan struct{}) + go func() { + defer close(signalDone) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if s.stopSignals.Load() { + return + } + _ = s.SdkClient().SignalWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test-signal", "ping") + case <-ctx.Done(): + return + } + } + }() + + // Wait for the search attribute to be set due to consecutive failures + s.EventuallyWithT(func(t *assert.CollectT) { + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + require.True(t, ok) + require.NotEmpty(t, saVal) + require.Contains(t, saVal, "category=WorkflowTaskFailed") + require.Contains(t, saVal, "cause=WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure") + }, 30*time.Second, 500*time.Millisecond) + + // // Continue sending signals for a few more seconds and verify the search attribute is NOT removed + // // This is the key part of the test - signals should not cause the search attribute to be cleared + // time.Sleep(5 * time.Second) + + s.EventuallyWithT(func(t *assert.CollectT) { + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + s.True(ok, "Search attribute should still be present after receiving signals") + s.NotEmpty(saVal, "Search attribute should not be empty after receiving signals") + }, 5*time.Second, 500*time.Millisecond) + + // Stop signals and unblock the workflow for cleanup + s.stopSignals.Store(true) + s.shouldFail.Store(false) + + // Send one final signal to trigger workflow completion + err = s.SdkClient().SignalWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test-signal", "final") + s.NoError(err) + + var out string + s.NoError(workflowRun.Get(ctx, &out)) + + // Wait for signal goroutine to finish + <-signalDone + + // Verify search attribute is cleared after successful completion + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + s.NotNil(description.TypedSearchAttributes) + _, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + s.False(ok) +} + func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_SetAndClear_FailAfterActivity() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() From 344422884f71bfd64784018d5b89ee5d8bfd6fd9 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Mon, 8 Dec 2025 14:05:02 -0700 Subject: [PATCH 02/15] tests will fail, also adding more debugging information --- .../history/workflow/mutable_state_impl.go | 18 ++++ .../workflow/workflow_task_state_machine.go | 101 +++++++++++++++--- 2 files changed, 103 insertions(+), 16 deletions(-) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index a8839b3e8c2..77508efed78 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -3222,8 +3222,26 @@ func (ms *MutableStateImpl) GetTransientWorkflowTaskInfo( identity string, ) *historyspb.TransientWorkflowTaskInfo { if !ms.IsTransientWorkflowTask() && workflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { + ms.logger.Info("DEBUG WFT: GetTransientWorkflowTaskInfo returning nil - not transient", + tag.NewInt64("workflow-task-attempt", int64(ms.executionInfo.WorkflowTaskAttempt)), + tag.NewInt64("scheduled-event-id", workflowTask.ScheduledEventID), + tag.NewInt64("next-event-id", ms.GetNextEventID())) return nil } + + // If real scheduled event has already been created (not transient anymore), don't return synthetic events + if workflowTask.ScheduledEventID < ms.GetNextEventID() { + ms.logger.Info("DEBUG WFT: GetTransientWorkflowTaskInfo returning nil - real event exists", + tag.NewInt64("workflow-task-attempt", int64(ms.executionInfo.WorkflowTaskAttempt)), + tag.NewInt64("scheduled-event-id", workflowTask.ScheduledEventID), + tag.NewInt64("next-event-id", ms.GetNextEventID())) + return nil + } + + ms.logger.Info("DEBUG WFT: GetTransientWorkflowTaskInfo returning synthetic events", + tag.NewInt64("workflow-task-attempt", int64(ms.executionInfo.WorkflowTaskAttempt)), + tag.NewInt64("scheduled-event-id", workflowTask.ScheduledEventID), + tag.NewInt64("next-event-id", ms.GetNextEventID())) return ms.workflowTaskManager.GetTransientWorkflowTaskInfo(workflowTask, identity) } diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index fc9ff2e0b31..911cf2fb281 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -319,15 +319,21 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( // Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for // transient/speculative workflow task and will cause in timeout processing to not work for transient workflow tasks. preservedAttemptDueToBufferedEvents := false + preservedAttemptForScheduledEvent := int32(0) if m.ms.HasBufferedEvents() { - // Only reset attempt if we're NOT in a transient workflow task (i.e., not continuously failing). - // When workflow tasks are failing continuously and signals arrive, we should preserve the attempt - // count to properly trigger the TemporalReportedProblems search attribute. + // When buffered events (signals, activity completions, etc.) arrive during continuous workflow task + // failures, we must reset WorkflowTaskAttempt to 1 to convert from transient to normal workflow task. + // However, we preserve the original attempt count to pass to the scheduled event so that: + // 1. The history event records the correct attempt number + // 2. workflowTask.Attempt preserves the value for search attribute checking wasTransient := m.ms.IsTransientWorkflowTask() - if !wasTransient { - m.ms.executionInfo.WorkflowTaskAttempt = 1 - } else { + if wasTransient { + preservedAttemptForScheduledEvent = m.ms.executionInfo.WorkflowTaskAttempt preservedAttemptDueToBufferedEvents = true + m.ms.logger.Info("DEBUG WFT: Buffered events during transient WFT, preserving attempt", + tag.Attempt(preservedAttemptForScheduledEvent)) + // Reset to 1 BEFORE flushing to ensure IsTransientWorkflowTask() returns false + m.ms.executionInfo.WorkflowTaskAttempt = 1 } workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL createWorkflowTaskScheduledEvent = true @@ -342,10 +348,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( return nil, err } - // If failover happened during transient workflow task, - // then reset the attempt to 1, and not use transient workflow task. - // However, if we just preserved the attempt due to buffered events during failures, - // don't reset it here even if versions differ (the version change is from flushing buffered events). + // If failover happened during transient workflow task, reset the attempt to 1. + // However, if we just preserved the attempt due to buffered events during continuous + // failures, don't reset it here (the version change is from flushing buffered events). if m.ms.GetCurrentVersion() != lastEventVersion { if !preservedAttemptDueToBufferedEvents { m.ms.executionInfo.WorkflowTaskAttempt = 1 @@ -355,12 +360,25 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( } } + // Restore WorkflowTaskAttempt if we preserved it due to buffered events. + // We had to reset it temporarily to make IsTransientWorkflowTask() return false + // for the failover check, but now we need the correct value for attempt tracking. + if preservedAttemptDueToBufferedEvents { + m.ms.executionInfo.WorkflowTaskAttempt = preservedAttemptForScheduledEvent + } + scheduleTime := m.ms.timeSource.Now().UTC() attempt := m.ms.executionInfo.WorkflowTaskAttempt + // Use preserved attempt for the scheduled event if we converted from transient to normal due to buffered events. + // This ensures the history event and workflowTask.Attempt record the correct consecutive failure count. + attemptForScheduledEvent := attempt + if preservedAttemptForScheduledEvent > 0 { + attemptForScheduledEvent = preservedAttemptForScheduledEvent + } // TaskQueue should already be set from workflow execution started event. taskQueue := m.ms.CurrentTaskQueue() // DefaultWorkflowTaskTimeout should already be set from workflow execution started event. - startToCloseTimeout := m.getStartToCloseTimeout(m.ms.executionInfo.DefaultWorkflowTaskTimeout, attempt) + startToCloseTimeout := m.getStartToCloseTimeout(m.ms.executionInfo.DefaultWorkflowTaskTimeout, attemptForScheduledEvent) var scheduledEvent *historypb.HistoryEvent var scheduledEventID int64 @@ -369,7 +387,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( scheduledEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent( taskQueue, startToCloseTimeout, - attempt, + attemptForScheduledEvent, scheduleTime, ) scheduledEventID = scheduledEvent.GetEventId() @@ -378,6 +396,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( scheduledEventID = m.ms.GetNextEventID() } + // Pass the correct attempt value (which may have been restored from preservation) to the workflow task state machine. + // For workflow tasks with buffered events, this will be the preserved attempt count for proper failure tracking. workflowTask, err := m.ApplyWorkflowTaskScheduledEvent( m.ms.GetCurrentVersion(), scheduledEventID, @@ -392,6 +412,16 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( return nil, err } + // Verify workflowTask.Attempt matches the preserved value. This should already be correct + // since we restored WorkflowTaskAttempt before calling ApplyWorkflowTaskScheduledEvent, + // but we keep this check for safety and consistency with the scheduled event. + if preservedAttemptForScheduledEvent > 0 && workflowTask.Attempt != preservedAttemptForScheduledEvent { + workflowTask.Attempt = preservedAttemptForScheduledEvent + m.ms.logger.Warn("DEBUG WFT: Correcting workflowTask.Attempt mismatch", + tag.Attempt(preservedAttemptForScheduledEvent), + tag.NewInt64("workflow-task-attempt-in-ms", int64(m.ms.executionInfo.WorkflowTaskAttempt))) + } + // TODO merge active & passive task generation if !bypassTaskGeneration { if workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { @@ -503,9 +533,10 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( if !workflowTaskScheduledEventCreated && (workflowTask.ScheduledEventID != m.ms.GetNextEventID() || workflowTask.Version != m.ms.GetCurrentVersion()) { - // Only reset attempt to 1 if this is NOT a transient workflow task (attempt was 1). - // If workflow tasks are continuously failing (attempt > 1) and new events arrive (e.g., buffered signals), - // preserve the attempt count to properly trigger the TemporalReportedProblems search attribute. + // Do not reset workflowTask.Attempt here (previously was unconditionally set to 1). + // For transient workflow tasks (attempt > 1 from continuous failures), preserving the + // attempt count ensures the TemporalReportedProblems search attribute is set correctly + // when failures reach the configured threshold. workflowTask.Type = enumsspb.WORKFLOW_TASK_TYPE_NORMAL workflowTaskScheduledEventCreated = true scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( @@ -956,6 +987,9 @@ func (m *workflowTaskStateMachine) failWorkflowTask( BuildId: m.ms.executionInfo.WorkflowTaskBuildId, } if incrementAttempt { + m.ms.logger.Info("DEBUG WFT: Incrementing attempt in failWorkflowTask", + tag.NewInt64("current-attempt", int64(m.ms.executionInfo.WorkflowTaskAttempt)), + tag.NewInt64("new-attempt", int64(m.ms.executionInfo.WorkflowTaskAttempt+1))) failWorkflowTaskInfo.Attempt = m.ms.executionInfo.WorkflowTaskAttempt + 1 failWorkflowTaskInfo.ScheduledTime = m.ms.timeSource.Now().UTC() if m.ms.config.EnableWorkflowTaskStampIncrementOnFailure() { @@ -966,7 +1000,13 @@ func (m *workflowTaskStateMachine) failWorkflowTask( m.UpdateWorkflowTask(failWorkflowTaskInfo) consecutiveFailuresRequired := m.ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(m.ms.GetNamespaceEntry().Name().String()) + m.ms.logger.Info("DEBUG WFT: failWorkflowTask checking search attribute", + tag.Attempt(failWorkflowTaskInfo.Attempt), + tag.NewInt32("consecutive-failures-required", int32(consecutiveFailuresRequired)), + tag.NewBoolTag("increment-attempt", incrementAttempt)) if consecutiveFailuresRequired > 0 && failWorkflowTaskInfo.Attempt >= int32(consecutiveFailuresRequired) { + m.ms.logger.Info("DEBUG WFT: Adding TemporalReportedProblems search attribute", + tag.Attempt(failWorkflowTaskInfo.Attempt)) if err := m.ms.UpdateReportedProblemsSearchAttribute(); err != nil { return err } @@ -1025,12 +1065,41 @@ func (m *workflowTaskStateMachine) UpdateWorkflowTask( m.ms.workflowCloseAttempted = false } + m.ms.logger.Info("DEBUG WFT: ApplyWorkflowTaskStartedEvent setting attempt", + tag.NewInt64("prev-mutable-state-attempt", int64(m.ms.executionInfo.WorkflowTaskAttempt)), + tag.Attempt(workflowTask.Attempt), + tag.NewInt64("scheduled-event-id", workflowTask.ScheduledEventID), + tag.NewInt64("next-event-id", m.ms.GetNextEventID())) + m.ms.executionInfo.WorkflowTaskVersion = workflowTask.Version m.ms.executionInfo.WorkflowTaskScheduledEventId = workflowTask.ScheduledEventID m.ms.executionInfo.WorkflowTaskStartedEventId = workflowTask.StartedEventID m.ms.executionInfo.WorkflowTaskRequestId = workflowTask.RequestID m.ms.executionInfo.WorkflowTaskTimeout = durationpb.New(workflowTask.WorkflowTaskTimeout) - m.ms.executionInfo.WorkflowTaskAttempt = workflowTask.Attempt + + // Sync WorkflowTaskAttempt for transient/speculative workflow tasks (scheduled event not yet in history). + // A workflow task is transient/speculative if: + // 1. ScheduledEventID >= NextEventID (scheduled event will be created in future), OR + // 2. ScheduledEventID == EmptyEventID (transient workflow task after failure) + // For normal workflow tasks with real scheduled events already in history, we only sync if started event + // doesn't exist yet, to preserve attempt tracking through the workflow task lifecycle. + isTransientOrSpeculative := workflowTask.ScheduledEventID >= m.ms.GetNextEventID() || + workflowTask.ScheduledEventID == common.EmptyEventID + isNormalNotStarted := workflowTask.ScheduledEventID > 0 && + workflowTask.ScheduledEventID < m.ms.GetNextEventID() && + workflowTask.StartedEventID == common.EmptyEventID + + if isTransientOrSpeculative || isNormalNotStarted { + m.ms.executionInfo.WorkflowTaskAttempt = workflowTask.Attempt + m.ms.logger.Info("DEBUG WFT: Syncing attempt", + tag.Attempt(workflowTask.Attempt), + tag.NewBoolTag("is-transient-or-speculative", isTransientOrSpeculative), + tag.NewBoolTag("is-normal-not-started", isNormalNotStarted)) + } else { + m.ms.logger.Info("DEBUG WFT: NOT syncing attempt - workflow task already started", + tag.NewInt64("keeping-attempt", int64(m.ms.executionInfo.WorkflowTaskAttempt)), + tag.NewInt64("workflowTask-attempt", int64(workflowTask.Attempt))) + } if !workflowTask.StartedTime.IsZero() { m.ms.executionInfo.WorkflowTaskStartedTime = timestamppb.New(workflowTask.StartedTime) } From feb640ea1f0d090997fed3624e28526ed9de8de6 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Mon, 8 Dec 2025 16:24:29 -0700 Subject: [PATCH 03/15] adding a new field to wft for tracking all failures --- api/persistence/v1/executions.pb.go | 15 +- .../api/persistence/v1/executions.proto | 4 + .../history/interfaces/workflow_task_info.go | 9 + .../history/workflow/mutable_state_impl.go | 18 -- .../workflow/workflow_task_state_machine.go | 154 +++++------------- tests/workflow_task_reported_problems_test.go | 7 +- 6 files changed, 69 insertions(+), 138 deletions(-) diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index 4c79139121f..e5da9068a41 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -177,8 +177,12 @@ type WorkflowExecutionInfo struct { // Stamp represents the "version" of the workflow's internal state. // It increases monotonically when the workflow's options are modified. // It is used to check if a workflow task is still relevant to the corresponding workflow state machine. - WorkflowTaskStamp int32 `protobuf:"varint,109,opt,name=workflow_task_stamp,json=workflowTaskStamp,proto3" json:"workflow_task_stamp,omitempty"` - CancelRequested bool `protobuf:"varint,29,opt,name=cancel_requested,json=cancelRequested,proto3" json:"cancel_requested,omitempty"` + WorkflowTaskStamp int32 `protobuf:"varint,109,opt,name=workflow_task_stamp,json=workflowTaskStamp,proto3" json:"workflow_task_stamp,omitempty"` + // AttemptsSinceLastSuccess tracks the number of workflow task attempts since the last successful workflow task. + // This is carried over when buffered events are applied after workflow task failures. + // Used by the TemporalReportedProblems search attribute to track continuous failure count. + WorkflowTaskAttemptsSinceLastSuccess int32 `protobuf:"varint,110,opt,name=workflow_task_attempts_since_last_success,json=workflowTaskAttemptsSinceLastSuccess,proto3" json:"workflow_task_attempts_since_last_success,omitempty"` + CancelRequested bool `protobuf:"varint,29,opt,name=cancel_requested,json=cancelRequested,proto3" json:"cancel_requested,omitempty"` CancelRequestId string `protobuf:"bytes,32,opt,name=cancel_request_id,json=cancelRequestId,proto3" json:"cancel_request_id,omitempty"` StickyTaskQueue string `protobuf:"bytes,33,opt,name=sticky_task_queue,json=stickyTaskQueue,proto3" json:"sticky_task_queue,omitempty"` // (-- api-linter: core::0140::prepositions=disabled @@ -602,6 +606,13 @@ func (x *WorkflowExecutionInfo) GetWorkflowTaskStamp() int32 { return 0 } +func (x *WorkflowExecutionInfo) GetWorkflowTaskAttemptsSinceLastSuccess() int32 { + if x != nil { + return x.WorkflowTaskAttemptsSinceLastSuccess + } + return 0 +} + func (x *WorkflowExecutionInfo) GetCancelRequested() bool { if x != nil { return x.CancelRequested diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 444599d5cdb..6e671630dbb 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -104,6 +104,10 @@ message WorkflowExecutionInfo { // It increases monotonically when the workflow's options are modified. // It is used to check if a workflow task is still relevant to the corresponding workflow state machine. int32 workflow_task_stamp = 109; + // AttemptsSinceLastSuccess tracks the number of workflow task attempts since the last successful workflow task. + // This is carried over when buffered events are applied after workflow task failures. + // Used by the TemporalReportedProblems search attribute to track continuous failure count. + int32 workflow_task_attempts_since_last_success = 110; bool cancel_requested = 29; string cancel_request_id = 32; diff --git a/service/history/interfaces/workflow_task_info.go b/service/history/interfaces/workflow_task_info.go index 781d82b8d77..a77a157ce9b 100644 --- a/service/history/interfaces/workflow_task_info.go +++ b/service/history/interfaces/workflow_task_info.go @@ -17,7 +17,16 @@ type WorkflowTaskInfo struct { WorkflowTaskTimeout time.Duration // This is only needed to communicate task queue used after AddWorkflowTaskScheduledEvent. TaskQueue *taskqueuepb.TaskQueue + + // Attempt is the number of attempts for this workflow task. Attempt int32 + // AttemptsSinceLastSuccess is the number of attempts since the last successful workflow task. + // This is used by the `TemporalReportedProblems` search attribute to check latest WFT failure count, + // this will only differ from attempts above when the previous workflow tasks failed and there was a + // buffered event (like a signal or activity finishing) applied to the workflow which causes the + // new workflow task to have an attempt of 1 again. + AttemptsSinceLastSuccess int32 + // Scheduled and Started timestamps are useful for transient workflow task: when transient workflow task finally completes, // use these Timestamp to create scheduled/started events. // Also used for recording latency metrics diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 77508efed78..a8839b3e8c2 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -3222,26 +3222,8 @@ func (ms *MutableStateImpl) GetTransientWorkflowTaskInfo( identity string, ) *historyspb.TransientWorkflowTaskInfo { if !ms.IsTransientWorkflowTask() && workflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { - ms.logger.Info("DEBUG WFT: GetTransientWorkflowTaskInfo returning nil - not transient", - tag.NewInt64("workflow-task-attempt", int64(ms.executionInfo.WorkflowTaskAttempt)), - tag.NewInt64("scheduled-event-id", workflowTask.ScheduledEventID), - tag.NewInt64("next-event-id", ms.GetNextEventID())) return nil } - - // If real scheduled event has already been created (not transient anymore), don't return synthetic events - if workflowTask.ScheduledEventID < ms.GetNextEventID() { - ms.logger.Info("DEBUG WFT: GetTransientWorkflowTaskInfo returning nil - real event exists", - tag.NewInt64("workflow-task-attempt", int64(ms.executionInfo.WorkflowTaskAttempt)), - tag.NewInt64("scheduled-event-id", workflowTask.ScheduledEventID), - tag.NewInt64("next-event-id", ms.GetNextEventID())) - return nil - } - - ms.logger.Info("DEBUG WFT: GetTransientWorkflowTaskInfo returning synthetic events", - tag.NewInt64("workflow-task-attempt", int64(ms.executionInfo.WorkflowTaskAttempt)), - tag.NewInt64("scheduled-event-id", workflowTask.ScheduledEventID), - tag.NewInt64("next-event-id", ms.GetNextEventID())) return ms.workflowTaskManager.GetTransientWorkflowTaskInfo(workflowTask, identity) } diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 911cf2fb281..c440214d755 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -318,23 +318,13 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( // If while scheduling a workflow task and new events has come, then this workflow task cannot be a transient/speculative. // Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for // transient/speculative workflow task and will cause in timeout processing to not work for transient workflow tasks. - preservedAttemptDueToBufferedEvents := false - preservedAttemptForScheduledEvent := int32(0) if m.ms.HasBufferedEvents() { - // When buffered events (signals, activity completions, etc.) arrive during continuous workflow task - // failures, we must reset WorkflowTaskAttempt to 1 to convert from transient to normal workflow task. - // However, we preserve the original attempt count to pass to the scheduled event so that: - // 1. The history event records the correct attempt number - // 2. workflowTask.Attempt preserves the value for search attribute checking - wasTransient := m.ms.IsTransientWorkflowTask() - if wasTransient { - preservedAttemptForScheduledEvent = m.ms.executionInfo.WorkflowTaskAttempt - preservedAttemptDueToBufferedEvents = true - m.ms.logger.Info("DEBUG WFT: Buffered events during transient WFT, preserving attempt", - tag.Attempt(preservedAttemptForScheduledEvent)) - // Reset to 1 BEFORE flushing to ensure IsTransientWorkflowTask() returns false - m.ms.executionInfo.WorkflowTaskAttempt = 1 + // When buffered events are applied, preserve the current attempt count in AttemptsSinceLastSuccess + // before resetting Attempt to 1. This ensures we track continuous failures across workflow task resets. + if m.ms.IsTransientWorkflowTask() { + m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = m.ms.executionInfo.WorkflowTaskAttempt } + m.ms.executionInfo.WorkflowTaskAttempt = 1 workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL createWorkflowTaskScheduledEvent = true m.ms.updatePendingEventIDs(m.ms.hBuilder.FlushBufferToCurrentBatch()) @@ -348,37 +338,21 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( return nil, err } - // If failover happened during transient workflow task, reset the attempt to 1. - // However, if we just preserved the attempt due to buffered events during continuous - // failures, don't reset it here (the version change is from flushing buffered events). + // If failover happened during transient workflow task, + // then reset the attempt to 1, and not use transient workflow task. if m.ms.GetCurrentVersion() != lastEventVersion { - if !preservedAttemptDueToBufferedEvents { - m.ms.executionInfo.WorkflowTaskAttempt = 1 - workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL - createWorkflowTaskScheduledEvent = true - } + m.ms.executionInfo.WorkflowTaskAttempt = 1 + workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL + createWorkflowTaskScheduledEvent = true } } - // Restore WorkflowTaskAttempt if we preserved it due to buffered events. - // We had to reset it temporarily to make IsTransientWorkflowTask() return false - // for the failover check, but now we need the correct value for attempt tracking. - if preservedAttemptDueToBufferedEvents { - m.ms.executionInfo.WorkflowTaskAttempt = preservedAttemptForScheduledEvent - } - scheduleTime := m.ms.timeSource.Now().UTC() attempt := m.ms.executionInfo.WorkflowTaskAttempt - // Use preserved attempt for the scheduled event if we converted from transient to normal due to buffered events. - // This ensures the history event and workflowTask.Attempt record the correct consecutive failure count. - attemptForScheduledEvent := attempt - if preservedAttemptForScheduledEvent > 0 { - attemptForScheduledEvent = preservedAttemptForScheduledEvent - } // TaskQueue should already be set from workflow execution started event. taskQueue := m.ms.CurrentTaskQueue() // DefaultWorkflowTaskTimeout should already be set from workflow execution started event. - startToCloseTimeout := m.getStartToCloseTimeout(m.ms.executionInfo.DefaultWorkflowTaskTimeout, attemptForScheduledEvent) + startToCloseTimeout := m.getStartToCloseTimeout(m.ms.executionInfo.DefaultWorkflowTaskTimeout, attempt) var scheduledEvent *historypb.HistoryEvent var scheduledEventID int64 @@ -387,7 +361,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( scheduledEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent( taskQueue, startToCloseTimeout, - attemptForScheduledEvent, + attempt, scheduleTime, ) scheduledEventID = scheduledEvent.GetEventId() @@ -396,8 +370,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( scheduledEventID = m.ms.GetNextEventID() } - // Pass the correct attempt value (which may have been restored from preservation) to the workflow task state machine. - // For workflow tasks with buffered events, this will be the preserved attempt count for proper failure tracking. workflowTask, err := m.ApplyWorkflowTaskScheduledEvent( m.ms.GetCurrentVersion(), scheduledEventID, @@ -412,16 +384,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( return nil, err } - // Verify workflowTask.Attempt matches the preserved value. This should already be correct - // since we restored WorkflowTaskAttempt before calling ApplyWorkflowTaskScheduledEvent, - // but we keep this check for safety and consistency with the scheduled event. - if preservedAttemptForScheduledEvent > 0 && workflowTask.Attempt != preservedAttemptForScheduledEvent { - workflowTask.Attempt = preservedAttemptForScheduledEvent - m.ms.logger.Warn("DEBUG WFT: Correcting workflowTask.Attempt mismatch", - tag.Attempt(preservedAttemptForScheduledEvent), - tag.NewInt64("workflow-task-attempt-in-ms", int64(m.ms.executionInfo.WorkflowTaskAttempt))) - } - // TODO merge active & passive task generation if !bypassTaskGeneration { if workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { @@ -533,10 +495,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( if !workflowTaskScheduledEventCreated && (workflowTask.ScheduledEventID != m.ms.GetNextEventID() || workflowTask.Version != m.ms.GetCurrentVersion()) { - // Do not reset workflowTask.Attempt here (previously was unconditionally set to 1). - // For transient workflow tasks (attempt > 1 from continuous failures), preserving the - // attempt count ensures the TemporalReportedProblems search attribute is set correctly - // when failures reach the configured threshold. + workflowTask.Attempt = 1 workflowTask.Type = enumsspb.WORKFLOW_TASK_TYPE_NORMAL workflowTaskScheduledEventCreated = true scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( @@ -971,26 +930,25 @@ func (m *workflowTaskStateMachine) failWorkflowTask( } failWorkflowTaskInfo := &historyi.WorkflowTaskInfo{ - Version: common.EmptyVersion, - ScheduledEventID: common.EmptyEventID, - StartedEventID: common.EmptyEventID, - RequestID: emptyUUID, - WorkflowTaskTimeout: time.Duration(0), - StartedTime: time.Unix(0, 0).UTC(), - TaskQueue: nil, - OriginalScheduledTime: time.Unix(0, 0).UTC(), - Attempt: 1, - Type: enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED, - SuggestContinueAsNew: false, - HistorySizeBytes: 0, + Version: common.EmptyVersion, + ScheduledEventID: common.EmptyEventID, + StartedEventID: common.EmptyEventID, + RequestID: emptyUUID, + WorkflowTaskTimeout: time.Duration(0), + StartedTime: time.Unix(0, 0).UTC(), + TaskQueue: nil, + OriginalScheduledTime: time.Unix(0, 0).UTC(), + Attempt: 1, + AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess, + Type: enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED, + SuggestContinueAsNew: false, + HistorySizeBytes: 0, // need to retain Build ID of failed WF task to compare it with the build ID of next attempt BuildId: m.ms.executionInfo.WorkflowTaskBuildId, } if incrementAttempt { - m.ms.logger.Info("DEBUG WFT: Incrementing attempt in failWorkflowTask", - tag.NewInt64("current-attempt", int64(m.ms.executionInfo.WorkflowTaskAttempt)), - tag.NewInt64("new-attempt", int64(m.ms.executionInfo.WorkflowTaskAttempt+1))) failWorkflowTaskInfo.Attempt = m.ms.executionInfo.WorkflowTaskAttempt + 1 + failWorkflowTaskInfo.AttemptsSinceLastSuccess = m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + 1 failWorkflowTaskInfo.ScheduledTime = m.ms.timeSource.Now().UTC() if m.ms.config.EnableWorkflowTaskStampIncrementOnFailure() { m.ms.executionInfo.WorkflowTaskStamp += 1 @@ -1000,13 +958,7 @@ func (m *workflowTaskStateMachine) failWorkflowTask( m.UpdateWorkflowTask(failWorkflowTaskInfo) consecutiveFailuresRequired := m.ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(m.ms.GetNamespaceEntry().Name().String()) - m.ms.logger.Info("DEBUG WFT: failWorkflowTask checking search attribute", - tag.Attempt(failWorkflowTaskInfo.Attempt), - tag.NewInt32("consecutive-failures-required", int32(consecutiveFailuresRequired)), - tag.NewBoolTag("increment-attempt", incrementAttempt)) - if consecutiveFailuresRequired > 0 && failWorkflowTaskInfo.Attempt >= int32(consecutiveFailuresRequired) { - m.ms.logger.Info("DEBUG WFT: Adding TemporalReportedProblems search attribute", - tag.Attempt(failWorkflowTaskInfo.Attempt)) + if consecutiveFailuresRequired > 0 && failWorkflowTaskInfo.AttemptsSinceLastSuccess >= int32(consecutiveFailuresRequired) { if err := m.ms.UpdateReportedProblemsSearchAttribute(); err != nil { return err } @@ -1025,14 +977,15 @@ func (m *workflowTaskStateMachine) deleteWorkflowTask() { m.ms.SetWorkflowTaskStartToCloseTimeoutTask(nil) resetWorkflowTaskInfo := &historyi.WorkflowTaskInfo{ - Version: common.EmptyVersion, - ScheduledEventID: common.EmptyEventID, - StartedEventID: common.EmptyEventID, - RequestID: emptyUUID, - WorkflowTaskTimeout: time.Duration(0), - Attempt: 1, - StartedTime: time.Unix(0, 0).UTC(), - ScheduledTime: time.Unix(0, 0).UTC(), + Version: common.EmptyVersion, + ScheduledEventID: common.EmptyEventID, + StartedEventID: common.EmptyEventID, + RequestID: emptyUUID, + WorkflowTaskTimeout: time.Duration(0), + Attempt: 1, + AttemptsSinceLastSuccess: 0, + StartedTime: time.Unix(0, 0).UTC(), + ScheduledTime: time.Unix(0, 0).UTC(), TaskQueue: nil, // Keep the last original scheduled Timestamp, so that AddWorkflowTaskScheduledEventAsHeartbeat can continue with it. @@ -1065,41 +1018,13 @@ func (m *workflowTaskStateMachine) UpdateWorkflowTask( m.ms.workflowCloseAttempted = false } - m.ms.logger.Info("DEBUG WFT: ApplyWorkflowTaskStartedEvent setting attempt", - tag.NewInt64("prev-mutable-state-attempt", int64(m.ms.executionInfo.WorkflowTaskAttempt)), - tag.Attempt(workflowTask.Attempt), - tag.NewInt64("scheduled-event-id", workflowTask.ScheduledEventID), - tag.NewInt64("next-event-id", m.ms.GetNextEventID())) - m.ms.executionInfo.WorkflowTaskVersion = workflowTask.Version m.ms.executionInfo.WorkflowTaskScheduledEventId = workflowTask.ScheduledEventID m.ms.executionInfo.WorkflowTaskStartedEventId = workflowTask.StartedEventID m.ms.executionInfo.WorkflowTaskRequestId = workflowTask.RequestID m.ms.executionInfo.WorkflowTaskTimeout = durationpb.New(workflowTask.WorkflowTaskTimeout) - - // Sync WorkflowTaskAttempt for transient/speculative workflow tasks (scheduled event not yet in history). - // A workflow task is transient/speculative if: - // 1. ScheduledEventID >= NextEventID (scheduled event will be created in future), OR - // 2. ScheduledEventID == EmptyEventID (transient workflow task after failure) - // For normal workflow tasks with real scheduled events already in history, we only sync if started event - // doesn't exist yet, to preserve attempt tracking through the workflow task lifecycle. - isTransientOrSpeculative := workflowTask.ScheduledEventID >= m.ms.GetNextEventID() || - workflowTask.ScheduledEventID == common.EmptyEventID - isNormalNotStarted := workflowTask.ScheduledEventID > 0 && - workflowTask.ScheduledEventID < m.ms.GetNextEventID() && - workflowTask.StartedEventID == common.EmptyEventID - - if isTransientOrSpeculative || isNormalNotStarted { - m.ms.executionInfo.WorkflowTaskAttempt = workflowTask.Attempt - m.ms.logger.Info("DEBUG WFT: Syncing attempt", - tag.Attempt(workflowTask.Attempt), - tag.NewBoolTag("is-transient-or-speculative", isTransientOrSpeculative), - tag.NewBoolTag("is-normal-not-started", isNormalNotStarted)) - } else { - m.ms.logger.Info("DEBUG WFT: NOT syncing attempt - workflow task already started", - tag.NewInt64("keeping-attempt", int64(m.ms.executionInfo.WorkflowTaskAttempt)), - tag.NewInt64("workflowTask-attempt", int64(workflowTask.Attempt))) - } + m.ms.executionInfo.WorkflowTaskAttempt = workflowTask.Attempt + m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = workflowTask.AttemptsSinceLastSuccess if !workflowTask.StartedTime.IsZero() { m.ms.executionInfo.WorkflowTaskStartedTime = timestamppb.New(workflowTask.StartedTime) } @@ -1225,6 +1150,7 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *historyi.WorkflowTaskI RequestID: m.ms.executionInfo.WorkflowTaskRequestId, WorkflowTaskTimeout: m.ms.executionInfo.WorkflowTaskTimeout.AsDuration(), Attempt: m.ms.executionInfo.WorkflowTaskAttempt, + AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess, StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime.AsTime(), ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime.AsTime(), TaskQueue: m.ms.CurrentTaskQueue(), diff --git a/tests/workflow_task_reported_problems_test.go b/tests/workflow_task_reported_problems_test.go index b4320417a77..6f20f13bd64 100644 --- a/tests/workflow_task_reported_problems_test.go +++ b/tests/workflow_task_reported_problems_test.go @@ -32,7 +32,7 @@ func TestWFTFailureReportedProblemsTestSuite(t *testing.T) { func (s *WFTFailureReportedProblemsTestSuite) SetupTest() { s.FunctionalTestBase.SetupTest() - s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute, 4) + s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute, 3) } func (s *WFTFailureReportedProblemsTestSuite) simpleWorkflowWithShouldFail(ctx workflow.Context) (string, error) { @@ -184,9 +184,8 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Not require.Contains(t, saVal, "cause=WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure") }, 30*time.Second, 500*time.Millisecond) - // // Continue sending signals for a few more seconds and verify the search attribute is NOT removed - // // This is the key part of the test - signals should not cause the search attribute to be cleared - // time.Sleep(5 * time.Second) + // Continue sending signals for a few more seconds and verify the search attribute is NOT removed + // This is the key part of the test - signals should not cause the search attribute to be cleared s.EventuallyWithT(func(t *assert.CollectT) { description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) From 8c3a2d555be9bd72756828773662fb3dd08f6f5d Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Mon, 8 Dec 2025 17:23:53 -0700 Subject: [PATCH 04/15] trying a different field on workflow execution info --- .../history/workflow/mutable_state_impl.go | 19 +-- .../workflow/workflow_task_state_machine.go | 111 +++++++++++++++++- tests/workflow_task_reported_problems_test.go | 4 +- 3 files changed, 119 insertions(+), 15 deletions(-) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index a8839b3e8c2..5144904c6aa 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -8444,15 +8444,16 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx var workflowTaskVersionUpdated bool if transitionhistory.Compare(current.WorkflowTaskLastUpdateVersionedTransition, incoming.WorkflowTaskLastUpdateVersionedTransition) != 0 { ms.workflowTaskManager.UpdateWorkflowTask(&historyi.WorkflowTaskInfo{ - Version: incoming.WorkflowTaskVersion, - ScheduledEventID: incoming.WorkflowTaskScheduledEventId, - StartedEventID: incoming.WorkflowTaskStartedEventId, - RequestID: incoming.WorkflowTaskRequestId, - WorkflowTaskTimeout: incoming.WorkflowTaskTimeout.AsDuration(), - Attempt: incoming.WorkflowTaskAttempt, - Stamp: incoming.WorkflowTaskStamp, - StartedTime: incoming.WorkflowTaskStartedTime.AsTime(), - ScheduledTime: incoming.WorkflowTaskScheduledTime.AsTime(), + Version: incoming.WorkflowTaskVersion, + ScheduledEventID: incoming.WorkflowTaskScheduledEventId, + StartedEventID: incoming.WorkflowTaskStartedEventId, + RequestID: incoming.WorkflowTaskRequestId, + WorkflowTaskTimeout: incoming.WorkflowTaskTimeout.AsDuration(), + Attempt: incoming.WorkflowTaskAttempt, + AttemptsSinceLastSuccess: incoming.WorkflowTaskAttemptsSinceLastSuccess, + Stamp: incoming.WorkflowTaskStamp, + StartedTime: incoming.WorkflowTaskStartedTime.AsTime(), + ScheduledTime: incoming.WorkflowTaskScheduledTime.AsTime(), OriginalScheduledTime: incoming.WorkflowTaskOriginalScheduledTime.AsTime(), Type: incoming.WorkflowTaskType, diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index c440214d755..8cad372078b 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -319,10 +319,28 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( // Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for // transient/speculative workflow task and will cause in timeout processing to not work for transient workflow tasks. if m.ms.HasBufferedEvents() { - // When buffered events are applied, preserve the current attempt count in AttemptsSinceLastSuccess + // When buffered events are applied, preserve the total failure count from the previous workflow task // before resetting Attempt to 1. This ensures we track continuous failures across workflow task resets. if m.ms.IsTransientWorkflowTask() { - m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = m.ms.executionInfo.WorkflowTaskAttempt + // Calculate total failures from previous WFT: AttemptsSinceLastSuccess + Attempt + // This becomes the new AttemptsSinceLastSuccess for the next WFT + previousTotalFailures := m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + m.ms.executionInfo.WorkflowTaskAttempt + m.ms.logger.Info("DEBUG WFT: buffered events - preserving total failures in AttemptsSinceLastSuccess", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("current-attempt", m.ms.executionInfo.WorkflowTaskAttempt), + tag.NewInt32("previous-attempts-since-last-success", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess), + tag.NewInt32("previous-total-failures", previousTotalFailures), + tag.NewInt32("new-attempts-since-last-success", previousTotalFailures)) + m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = previousTotalFailures + } else { + m.ms.logger.Info("DEBUG WFT: buffered events - not transient, not preserving", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("current-attempt", m.ms.executionInfo.WorkflowTaskAttempt), + tag.NewInt32("current-attempts-since-last-success", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess)) } m.ms.executionInfo.WorkflowTaskAttempt = 1 workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL @@ -929,6 +947,17 @@ func (m *workflowTaskStateMachine) failWorkflowTask( m.ms.ClearStickyTaskQueue() } + m.ms.logger.Info("DEBUG WFT: failWorkflowTask called", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("current-attempt", m.ms.executionInfo.WorkflowTaskAttempt), + tag.NewInt32("current-attempts-since-last-success", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess), + tag.NewBoolTag("increment-attempt", incrementAttempt), + tag.NewBoolTag("is-transient", m.ms.IsTransientWorkflowTask())) + + // AttemptsSinceLastSuccess is the carried-over attempt count from the previous workflow task. + // It stays the same across failures unless buffered events reset the Attempt counter. failWorkflowTaskInfo := &historyi.WorkflowTaskInfo{ Version: common.EmptyVersion, ScheduledEventID: common.EmptyEventID, @@ -946,22 +975,72 @@ func (m *workflowTaskStateMachine) failWorkflowTask( // need to retain Build ID of failed WF task to compare it with the build ID of next attempt BuildId: m.ms.executionInfo.WorkflowTaskBuildId, } + if incrementAttempt { failWorkflowTaskInfo.Attempt = m.ms.executionInfo.WorkflowTaskAttempt + 1 - failWorkflowTaskInfo.AttemptsSinceLastSuccess = m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + 1 + m.ms.logger.Info("DEBUG WFT: incrementing attempt", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("previous-attempt", m.ms.executionInfo.WorkflowTaskAttempt), + tag.NewInt32("new-attempt", failWorkflowTaskInfo.Attempt), + tag.NewInt32("attempts-since-last-success", failWorkflowTaskInfo.AttemptsSinceLastSuccess)) failWorkflowTaskInfo.ScheduledTime = m.ms.timeSource.Now().UTC() if m.ms.config.EnableWorkflowTaskStampIncrementOnFailure() { m.ms.executionInfo.WorkflowTaskStamp += 1 } } m.retainWorkflowTaskBuildIdInfo(failWorkflowTaskInfo) + + m.ms.logger.Info("DEBUG WFT: before UpdateWorkflowTask", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("failWorkflowTaskInfo.Attempt", failWorkflowTaskInfo.Attempt), + tag.NewInt32("failWorkflowTaskInfo.AttemptsSinceLastSuccess", failWorkflowTaskInfo.AttemptsSinceLastSuccess)) + m.UpdateWorkflowTask(failWorkflowTaskInfo) + m.ms.logger.Info("DEBUG WFT: after UpdateWorkflowTask", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("executionInfo.Attempt", m.ms.executionInfo.WorkflowTaskAttempt), + tag.NewInt32("executionInfo.AttemptsSinceLastSuccess", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess)) + + // Total continuous failures = AttemptsSinceLastSuccess + Attempt + // AttemptsSinceLastSuccess contains the sum of all previous workflow task failures. + // Attempt is the current workflow task's attempt number (>= 1). + // When this function is called, we're recording a failure, so Attempt represents at least one attempt. + totalContinuousFailures := failWorkflowTaskInfo.AttemptsSinceLastSuccess + failWorkflowTaskInfo.Attempt consecutiveFailuresRequired := m.ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(m.ms.GetNamespaceEntry().Name().String()) - if consecutiveFailuresRequired > 0 && failWorkflowTaskInfo.AttemptsSinceLastSuccess >= int32(consecutiveFailuresRequired) { + m.ms.logger.Info("DEBUG WFT: checking search attribute", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("attempt", failWorkflowTaskInfo.Attempt), + tag.NewInt32("attempts-since-last-success", failWorkflowTaskInfo.AttemptsSinceLastSuccess), + tag.NewInt32("total-continuous-failures", totalContinuousFailures), + tag.NewInt32("consecutive-failures-required", int32(consecutiveFailuresRequired)), + tag.NewBoolTag("will-add-search-attribute", consecutiveFailuresRequired > 0 && totalContinuousFailures >= int32(consecutiveFailuresRequired))) + if consecutiveFailuresRequired > 0 && totalContinuousFailures >= int32(consecutiveFailuresRequired) { + m.ms.logger.Info("DEBUG WFT: adding TemporalReportedProblems search attribute", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("total-continuous-failures", totalContinuousFailures)) if err := m.ms.UpdateReportedProblemsSearchAttribute(); err != nil { + m.ms.logger.Error("DEBUG WFT: failed to add search attribute", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.Error(err)) return err } + m.ms.logger.Info("DEBUG WFT: successfully added search attribute", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId)) } return nil } @@ -972,6 +1051,14 @@ func (m *workflowTaskStateMachine) deleteWorkflowTask() { currentWorkflowTask := m.getWorkflowTaskInfo() m.recordTimeoutTasksForDeletion(currentWorkflowTask) + m.ms.logger.Info("DEBUG WFT: deleteWorkflowTask called (success case)", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("previous-attempt", currentWorkflowTask.Attempt), + tag.NewInt32("previous-attempts-since-last-success", currentWorkflowTask.AttemptsSinceLastSuccess), + tag.NewBoolTag("resetting-to-zero", true)) + // Clear in-memory timeout tasks m.ms.SetWorkflowTaskScheduleToStartTimeoutTask(nil) m.ms.SetWorkflowTaskStartToCloseTimeoutTask(nil) @@ -1001,6 +1088,15 @@ func (m *workflowTaskStateMachine) deleteWorkflowTask() { func (m *workflowTaskStateMachine) UpdateWorkflowTask( workflowTask *historyi.WorkflowTaskInfo, ) { + m.ms.logger.Info("DEBUG WFT: UpdateWorkflowTask called", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("input.Attempt", workflowTask.Attempt), + tag.NewInt32("input.AttemptsSinceLastSuccess", workflowTask.AttemptsSinceLastSuccess), + tag.NewInt32("before.Attempt", m.ms.executionInfo.WorkflowTaskAttempt), + tag.NewInt32("before.AttemptsSinceLastSuccess", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess)) + if m.HasStartedWorkflowTask() && workflowTask.StartedEventID == common.EmptyEventID { // reset the flag whenever started workflow task closes, there could be three cases: // 1. workflow task completed: @@ -1025,6 +1121,13 @@ func (m *workflowTaskStateMachine) UpdateWorkflowTask( m.ms.executionInfo.WorkflowTaskTimeout = durationpb.New(workflowTask.WorkflowTaskTimeout) m.ms.executionInfo.WorkflowTaskAttempt = workflowTask.Attempt m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = workflowTask.AttemptsSinceLastSuccess + + m.ms.logger.Info("DEBUG WFT: UpdateWorkflowTask finished", + tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), + tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(m.ms.GetExecutionState().RunId), + tag.NewInt32("after.Attempt", m.ms.executionInfo.WorkflowTaskAttempt), + tag.NewInt32("after.AttemptsSinceLastSuccess", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess)) if !workflowTask.StartedTime.IsZero() { m.ms.executionInfo.WorkflowTaskStartedTime = timestamppb.New(workflowTask.StartedTime) } diff --git a/tests/workflow_task_reported_problems_test.go b/tests/workflow_task_reported_problems_test.go index 6f20f13bd64..9f0570f5205 100644 --- a/tests/workflow_task_reported_problems_test.go +++ b/tests/workflow_task_reported_problems_test.go @@ -121,7 +121,7 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Set execution, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) require.NoError(t, err) require.GreaterOrEqual(t, execution.PendingWorkflowTask.Attempt, int32(2)) - }, 5*time.Second, 500*time.Millisecond) + }, 20*time.Second, 500*time.Millisecond) // Unblock the workflow s.shouldFail.Store(false) @@ -244,7 +244,7 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Set require.Len(t, saValues, 2) require.Contains(t, saValues, "category=WorkflowTaskFailed") require.Contains(t, saValues, "cause=WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure") - }, 5*time.Second, 500*time.Millisecond) + }, 20*time.Second, 500*time.Millisecond) // Unblock the workflow s.shouldFail.Store(false) From 84f5e8cc9cb9208c4c7135d8e58bb40dc28e9dd4 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 9 Dec 2025 10:01:55 -0700 Subject: [PATCH 05/15] fixing linters and adding xdc test --- .../history/interfaces/workflow_task_info.go | 6 +- tests/workflow_task_reported_problems_test.go | 19 +- .../workflow_task_reported_problems_test.go | 260 +++++++++++++++++- 3 files changed, 267 insertions(+), 18 deletions(-) diff --git a/service/history/interfaces/workflow_task_info.go b/service/history/interfaces/workflow_task_info.go index a77a157ce9b..27a5e361899 100644 --- a/service/history/interfaces/workflow_task_info.go +++ b/service/history/interfaces/workflow_task_info.go @@ -19,11 +19,11 @@ type WorkflowTaskInfo struct { TaskQueue *taskqueuepb.TaskQueue // Attempt is the number of attempts for this workflow task. - Attempt int32 + Attempt int32 // AttemptsSinceLastSuccess is the number of attempts since the last successful workflow task. // This is used by the `TemporalReportedProblems` search attribute to check latest WFT failure count, - // this will only differ from attempts above when the previous workflow tasks failed and there was a - // buffered event (like a signal or activity finishing) applied to the workflow which causes the + // this will only differ from attempts above when the previous workflow tasks failed and there was a + // buffered event (like a signal or activity finishing) applied to the workflow which causes the // new workflow task to have an attempt of 1 again. AttemptsSinceLastSuccess int32 diff --git a/tests/workflow_task_reported_problems_test.go b/tests/workflow_task_reported_problems_test.go index 9f0570f5205..62c922088a5 100644 --- a/tests/workflow_task_reported_problems_test.go +++ b/tests/workflow_task_reported_problems_test.go @@ -52,22 +52,15 @@ func (s *WFTFailureReportedProblemsTestSuite) simpleActivity() (string, error) { func (s *WFTFailureReportedProblemsTestSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) { signalChan := workflow.GetSignalChannel(ctx, "test-signal") - for { - var signalValue string - more := signalChan.Receive(ctx, &signalValue) - if !more { - break - } - - // Always fail after receiving a signal - if s.shouldFail.Load() { - panic("forced-panic-after-signal") - } + var signalValue string + signalChan.Receive(ctx, &signalValue) - // If we reach here, shouldFail is false, so we can complete - return "done!", nil + // Always fail after receiving a signal, unless shouldFail is false + if s.shouldFail.Load() { + panic("forced-panic-after-signal") } + // If we reach here, shouldFail is false, so we can complete return "done!", nil } diff --git a/tests/xdc/workflow_task_reported_problems_test.go b/tests/xdc/workflow_task_reported_problems_test.go index fbedbf3c20f..21507fe8df5 100644 --- a/tests/xdc/workflow_task_reported_problems_test.go +++ b/tests/xdc/workflow_task_reported_problems_test.go @@ -27,8 +27,8 @@ import ( type ( WorkflowTaskReportedProblemsReplicationSuite struct { xdcBaseSuite - shouldFail atomic.Bool - failureCount atomic.Int32 + shouldFail atomic.Bool + stopSignals atomic.Bool } ) @@ -301,3 +301,259 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro false, ) } + +func (s *WorkflowTaskReportedProblemsReplicationSuite) simpleActivity() (string, error) { + return "done!", nil +} + +// workflowWithSignalsThatFails creates a workflow that listens for signals and fails on each workflow task. +func (s *WorkflowTaskReportedProblemsReplicationSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) { + signalChan := workflow.GetSignalChannel(ctx, "test-signal") + + var signalValue string + signalChan.Receive(ctx, &signalValue) + + // Always fail after receiving a signal, unless shouldFail is false + if s.shouldFail.Load() { + panic("forced-panic-after-signal") + } + + // If we reach here, shouldFail is false, so we can complete + return "done!", nil +} + +// workflowWithActivity creates a workflow that executes an activity before potentially failing. +func (s *WorkflowTaskReportedProblemsReplicationSuite) workflowWithActivity(ctx workflow.Context) (string, error) { + var ret string + err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Second, + }), s.simpleActivity).Get(ctx, &ret) + if err != nil { + return "", err + } + + if s.shouldFail.Load() { + panic("forced-panic-to-fail-wft") + } + + return "done!", nil +} + +func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedProblems_NotClearedBySignals() { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + ns := s.createGlobalNamespace() + activeSDKClient, err := sdkclient.Dial(sdkclient.Options{ + HostPort: s.clusters[0].Host().FrontendGRPCAddress(), + Namespace: ns, + Logger: log.NewSdkLogger(s.logger), + }) + s.NoError(err) + + s.shouldFail.Store(true) + s.stopSignals.Store(false) + + taskQueue := testcore.RandomizeStr("tq") + worker1 := sdkworker.New(activeSDKClient, taskQueue, sdkworker.Options{}) + worker1.RegisterWorkflow(s.workflowWithSignalsThatFails) + s.NoError(worker1.Start()) + defer worker1.Stop() + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wfid-" + s.T().Name()), + TaskQueue: taskQueue, + } + + workflowRun, err := activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.workflowWithSignalsThatFails) + s.NoError(err) + + // Start sending signals every second in a goroutine + signalDone := make(chan struct{}) + go func() { + defer close(signalDone) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if s.stopSignals.Load() { + return + } + _ = activeSDKClient.SignalWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test-signal", "ping") + case <-ctx.Done(): + return + } + } + }() + + // Wait for the search attribute to be set due to consecutive failures + s.checkReportedProblemsSearchAttribute( + s.clusters[0].Host().AdminClient(), + activeSDKClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "WorkflowTaskFailed", + "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", + true, + ) + + // Continue sending signals for a few more seconds and verify the search attribute is NOT removed + s.EventuallyWithT(func(t *assert.CollectT) { + description, err := activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + s.True(ok, "Search attribute should still be present after receiving signals") + s.NotEmpty(saVal, "Search attribute should not be empty after receiving signals") + }, 5*time.Second, 500*time.Millisecond) + + // get standby client + standbyClient, err := sdkclient.Dial(sdkclient.Options{ + HostPort: s.clusters[1].Host().FrontendGRPCAddress(), + Namespace: ns, + }) + s.NoError(err) + s.NotNil(standbyClient) + + // verify search attributes are replicated to cluster1 + s.checkReportedProblemsSearchAttribute( + s.clusters[1].Host().AdminClient(), + standbyClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "WorkflowTaskFailed", + "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", + true, + ) + + // Stop signals and unblock the workflow for cleanup + s.stopSignals.Store(true) + s.shouldFail.Store(false) + + // Send one final signal to trigger workflow completion + err = activeSDKClient.SignalWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test-signal", "final") + s.NoError(err) + + var out string + s.NoError(workflowRun.Get(ctx, &out)) + + // Wait for signal goroutine to finish + <-signalDone + + // Verify search attribute is cleared after successful completion in both clusters + s.checkReportedProblemsSearchAttribute( + s.clusters[0].Host().AdminClient(), + activeSDKClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "", + "", + false, + ) + + s.checkReportedProblemsSearchAttribute( + s.clusters[1].Host().AdminClient(), + standbyClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "", + "", + false, + ) +} + +func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedProblems_SetAndClear_FailAfterActivity() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ns := s.createGlobalNamespace() + activeSDKClient, err := sdkclient.Dial(sdkclient.Options{ + HostPort: s.clusters[0].Host().FrontendGRPCAddress(), + Namespace: ns, + Logger: log.NewSdkLogger(s.logger), + }) + s.NoError(err) + + s.shouldFail.Store(true) + + taskQueue := testcore.RandomizeStr("tq") + worker1 := sdkworker.New(activeSDKClient, taskQueue, sdkworker.Options{}) + worker1.RegisterWorkflow(s.workflowWithActivity) + worker1.RegisterActivity(s.simpleActivity) + s.NoError(worker1.Start()) + defer worker1.Stop() + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wfid-" + s.T().Name()), + TaskQueue: taskQueue, + } + + workflowRun, err := activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.workflowWithActivity) + s.NoError(err) + + // Validate the search attributes are set after activity completes + s.checkReportedProblemsSearchAttribute( + s.clusters[0].Host().AdminClient(), + activeSDKClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "WorkflowTaskFailed", + "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", + true, + ) + + // get standby client + standbyClient, err := sdkclient.Dial(sdkclient.Options{ + HostPort: s.clusters[1].Host().FrontendGRPCAddress(), + Namespace: ns, + }) + s.NoError(err) + s.NotNil(standbyClient) + + // verify search attributes are replicated to cluster1 + s.checkReportedProblemsSearchAttribute( + s.clusters[1].Host().AdminClient(), + standbyClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "WorkflowTaskFailed", + "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", + true, + ) + + // Unblock the workflow + s.shouldFail.Store(false) + + var out string + s.NoError(workflowRun.Get(ctx, &out)) + + // Verify search attribute is cleared after successful completion in both clusters + s.checkReportedProblemsSearchAttribute( + s.clusters[0].Host().AdminClient(), + activeSDKClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "", + "", + false, + ) + + s.checkReportedProblemsSearchAttribute( + s.clusters[1].Host().AdminClient(), + standbyClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "", + "", + false, + ) +} From 2cd6eba5979af261b082e5c75a43801f17ef9a2d Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 9 Dec 2025 10:20:50 -0700 Subject: [PATCH 06/15] linting api --- api/persistence/v1/executions.pb.go | 16 ++++++++++------ .../server/api/persistence/v1/executions.proto | 2 ++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index e5da9068a41..b959e5ed0fd 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -181,10 +181,13 @@ type WorkflowExecutionInfo struct { // AttemptsSinceLastSuccess tracks the number of workflow task attempts since the last successful workflow task. // This is carried over when buffered events are applied after workflow task failures. // Used by the TemporalReportedProblems search attribute to track continuous failure count. - WorkflowTaskAttemptsSinceLastSuccess int32 `protobuf:"varint,110,opt,name=workflow_task_attempts_since_last_success,json=workflowTaskAttemptsSinceLastSuccess,proto3" json:"workflow_task_attempts_since_last_success,omitempty"` - CancelRequested bool `protobuf:"varint,29,opt,name=cancel_requested,json=cancelRequested,proto3" json:"cancel_requested,omitempty"` - CancelRequestId string `protobuf:"bytes,32,opt,name=cancel_request_id,json=cancelRequestId,proto3" json:"cancel_request_id,omitempty"` - StickyTaskQueue string `protobuf:"bytes,33,opt,name=sticky_task_queue,json=stickyTaskQueue,proto3" json:"sticky_task_queue,omitempty"` + // (-- api-linter: core::0140::prepositions=disabled + // + // aip.dev/not-precedent: "since" is needed here. --) + WorkflowTaskAttemptsSinceLastSuccess int32 `protobuf:"varint,110,opt,name=workflow_task_attempts_since_last_success,json=workflowTaskAttemptsSinceLastSuccess,proto3" json:"workflow_task_attempts_since_last_success,omitempty"` + CancelRequested bool `protobuf:"varint,29,opt,name=cancel_requested,json=cancelRequested,proto3" json:"cancel_requested,omitempty"` + CancelRequestId string `protobuf:"bytes,32,opt,name=cancel_request_id,json=cancelRequestId,proto3" json:"cancel_request_id,omitempty"` + StickyTaskQueue string `protobuf:"bytes,33,opt,name=sticky_task_queue,json=stickyTaskQueue,proto3" json:"sticky_task_queue,omitempty"` // (-- api-linter: core::0140::prepositions=disabled // // aip.dev/not-precedent: "to" is used to indicate interval. --) @@ -4508,7 +4511,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x03key\x18\x01 \x01(\x05R\x03key\x12D\n" + "\x05value\x18\x02 \x01(\v2..temporal.server.api.persistence.v1.QueueStateR\x05value:\x028\x01J\x04\b\x04\x10\x05J\x04\b\x05\x10\x06J\x04\b\b\x10\tJ\x04\b\t\x10\n" + "J\x04\b\n" + - "\x10\vJ\x04\b\v\x10\fJ\x04\b\f\x10\rJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11\"\xfe=\n" + + "\x10\vJ\x04\b\v\x10\fJ\x04\b\f\x10\rJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10J\x04\b\x10\x10\x11\"\xd7>\n" + "\x15WorkflowExecutionInfo\x12!\n" + "\fnamespace_id\x18\x01 \x01(\tR\vnamespaceId\x12\x1f\n" + "\vworkflow_id\x18\x02 \x01(\tR\n" + @@ -4545,7 +4548,8 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + " workflow_task_history_size_bytes\x18F \x01(\x03R\x1cworkflowTaskHistorySizeBytes\x123\n" + "\x16workflow_task_build_id\x18X \x01(\tR\x13workflowTaskBuildId\x12S\n" + "'workflow_task_build_id_redirect_counter\x18Y \x01(\x03R\"workflowTaskBuildIdRedirectCounter\x12.\n" + - "\x13workflow_task_stamp\x18m \x01(\x05R\x11workflowTaskStamp\x12)\n" + + "\x13workflow_task_stamp\x18m \x01(\x05R\x11workflowTaskStamp\x12W\n" + + ")workflow_task_attempts_since_last_success\x18n \x01(\x05R$workflowTaskAttemptsSinceLastSuccess\x12)\n" + "\x10cancel_requested\x18\x1d \x01(\bR\x0fcancelRequested\x12*\n" + "\x11cancel_request_id\x18 \x01(\tR\x0fcancelRequestId\x12*\n" + "\x11sticky_task_queue\x18! \x01(\tR\x0fstickyTaskQueue\x12a\n" + diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 6e671630dbb..611558a16e1 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -107,6 +107,8 @@ message WorkflowExecutionInfo { // AttemptsSinceLastSuccess tracks the number of workflow task attempts since the last successful workflow task. // This is carried over when buffered events are applied after workflow task failures. // Used by the TemporalReportedProblems search attribute to track continuous failure count. + // (-- api-linter: core::0140::prepositions=disabled + // aip.dev/not-precedent: "since" is needed here. --) int32 workflow_task_attempts_since_last_success = 110; bool cancel_requested = 29; From ddf398d1e40858d5ce337f81eef7623ea10353fd Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 9 Dec 2025 10:43:08 -0700 Subject: [PATCH 07/15] removing all the debug statements --- .../workflow/workflow_task_state_machine.go | 93 +------------------ 1 file changed, 1 insertion(+), 92 deletions(-) diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 8cad372078b..364d26a1e7a 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -325,22 +325,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( // Calculate total failures from previous WFT: AttemptsSinceLastSuccess + Attempt // This becomes the new AttemptsSinceLastSuccess for the next WFT previousTotalFailures := m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + m.ms.executionInfo.WorkflowTaskAttempt - m.ms.logger.Info("DEBUG WFT: buffered events - preserving total failures in AttemptsSinceLastSuccess", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("current-attempt", m.ms.executionInfo.WorkflowTaskAttempt), - tag.NewInt32("previous-attempts-since-last-success", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess), - tag.NewInt32("previous-total-failures", previousTotalFailures), - tag.NewInt32("new-attempts-since-last-success", previousTotalFailures)) m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = previousTotalFailures - } else { - m.ms.logger.Info("DEBUG WFT: buffered events - not transient, not preserving", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("current-attempt", m.ms.executionInfo.WorkflowTaskAttempt), - tag.NewInt32("current-attempts-since-last-success", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess)) } m.ms.executionInfo.WorkflowTaskAttempt = 1 workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL @@ -947,15 +932,6 @@ func (m *workflowTaskStateMachine) failWorkflowTask( m.ms.ClearStickyTaskQueue() } - m.ms.logger.Info("DEBUG WFT: failWorkflowTask called", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("current-attempt", m.ms.executionInfo.WorkflowTaskAttempt), - tag.NewInt32("current-attempts-since-last-success", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess), - tag.NewBoolTag("increment-attempt", incrementAttempt), - tag.NewBoolTag("is-transient", m.ms.IsTransientWorkflowTask())) - // AttemptsSinceLastSuccess is the carried-over attempt count from the previous workflow task. // It stays the same across failures unless buffered events reset the Attempt counter. failWorkflowTaskInfo := &historyi.WorkflowTaskInfo{ @@ -978,13 +954,6 @@ func (m *workflowTaskStateMachine) failWorkflowTask( if incrementAttempt { failWorkflowTaskInfo.Attempt = m.ms.executionInfo.WorkflowTaskAttempt + 1 - m.ms.logger.Info("DEBUG WFT: incrementing attempt", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("previous-attempt", m.ms.executionInfo.WorkflowTaskAttempt), - tag.NewInt32("new-attempt", failWorkflowTaskInfo.Attempt), - tag.NewInt32("attempts-since-last-success", failWorkflowTaskInfo.AttemptsSinceLastSuccess)) failWorkflowTaskInfo.ScheduledTime = m.ms.timeSource.Now().UTC() if m.ms.config.EnableWorkflowTaskStampIncrementOnFailure() { m.ms.executionInfo.WorkflowTaskStamp += 1 @@ -992,55 +961,19 @@ func (m *workflowTaskStateMachine) failWorkflowTask( } m.retainWorkflowTaskBuildIdInfo(failWorkflowTaskInfo) - m.ms.logger.Info("DEBUG WFT: before UpdateWorkflowTask", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("failWorkflowTaskInfo.Attempt", failWorkflowTaskInfo.Attempt), - tag.NewInt32("failWorkflowTaskInfo.AttemptsSinceLastSuccess", failWorkflowTaskInfo.AttemptsSinceLastSuccess)) - m.UpdateWorkflowTask(failWorkflowTaskInfo) - m.ms.logger.Info("DEBUG WFT: after UpdateWorkflowTask", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("executionInfo.Attempt", m.ms.executionInfo.WorkflowTaskAttempt), - tag.NewInt32("executionInfo.AttemptsSinceLastSuccess", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess)) - // Total continuous failures = AttemptsSinceLastSuccess + Attempt // AttemptsSinceLastSuccess contains the sum of all previous workflow task failures. // Attempt is the current workflow task's attempt number (>= 1). // When this function is called, we're recording a failure, so Attempt represents at least one attempt. totalContinuousFailures := failWorkflowTaskInfo.AttemptsSinceLastSuccess + failWorkflowTaskInfo.Attempt consecutiveFailuresRequired := m.ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(m.ms.GetNamespaceEntry().Name().String()) - m.ms.logger.Info("DEBUG WFT: checking search attribute", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("attempt", failWorkflowTaskInfo.Attempt), - tag.NewInt32("attempts-since-last-success", failWorkflowTaskInfo.AttemptsSinceLastSuccess), - tag.NewInt32("total-continuous-failures", totalContinuousFailures), - tag.NewInt32("consecutive-failures-required", int32(consecutiveFailuresRequired)), - tag.NewBoolTag("will-add-search-attribute", consecutiveFailuresRequired > 0 && totalContinuousFailures >= int32(consecutiveFailuresRequired))) + if consecutiveFailuresRequired > 0 && totalContinuousFailures >= int32(consecutiveFailuresRequired) { - m.ms.logger.Info("DEBUG WFT: adding TemporalReportedProblems search attribute", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("total-continuous-failures", totalContinuousFailures)) if err := m.ms.UpdateReportedProblemsSearchAttribute(); err != nil { - m.ms.logger.Error("DEBUG WFT: failed to add search attribute", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.Error(err)) return err } - m.ms.logger.Info("DEBUG WFT: successfully added search attribute", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId)) } return nil } @@ -1051,14 +984,6 @@ func (m *workflowTaskStateMachine) deleteWorkflowTask() { currentWorkflowTask := m.getWorkflowTaskInfo() m.recordTimeoutTasksForDeletion(currentWorkflowTask) - m.ms.logger.Info("DEBUG WFT: deleteWorkflowTask called (success case)", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("previous-attempt", currentWorkflowTask.Attempt), - tag.NewInt32("previous-attempts-since-last-success", currentWorkflowTask.AttemptsSinceLastSuccess), - tag.NewBoolTag("resetting-to-zero", true)) - // Clear in-memory timeout tasks m.ms.SetWorkflowTaskScheduleToStartTimeoutTask(nil) m.ms.SetWorkflowTaskStartToCloseTimeoutTask(nil) @@ -1088,15 +1013,6 @@ func (m *workflowTaskStateMachine) deleteWorkflowTask() { func (m *workflowTaskStateMachine) UpdateWorkflowTask( workflowTask *historyi.WorkflowTaskInfo, ) { - m.ms.logger.Info("DEBUG WFT: UpdateWorkflowTask called", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("input.Attempt", workflowTask.Attempt), - tag.NewInt32("input.AttemptsSinceLastSuccess", workflowTask.AttemptsSinceLastSuccess), - tag.NewInt32("before.Attempt", m.ms.executionInfo.WorkflowTaskAttempt), - tag.NewInt32("before.AttemptsSinceLastSuccess", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess)) - if m.HasStartedWorkflowTask() && workflowTask.StartedEventID == common.EmptyEventID { // reset the flag whenever started workflow task closes, there could be three cases: // 1. workflow task completed: @@ -1121,13 +1037,6 @@ func (m *workflowTaskStateMachine) UpdateWorkflowTask( m.ms.executionInfo.WorkflowTaskTimeout = durationpb.New(workflowTask.WorkflowTaskTimeout) m.ms.executionInfo.WorkflowTaskAttempt = workflowTask.Attempt m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = workflowTask.AttemptsSinceLastSuccess - - m.ms.logger.Info("DEBUG WFT: UpdateWorkflowTask finished", - tag.WorkflowNamespace(m.ms.GetNamespaceEntry().Name().String()), - tag.WorkflowID(m.ms.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(m.ms.GetExecutionState().RunId), - tag.NewInt32("after.Attempt", m.ms.executionInfo.WorkflowTaskAttempt), - tag.NewInt32("after.AttemptsSinceLastSuccess", m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess)) if !workflowTask.StartedTime.IsZero() { m.ms.executionInfo.WorkflowTaskStartedTime = timestamppb.New(workflowTask.StartedTime) } From ee13ae04b4979ee22d4c08bde7a9259d995b08e4 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 9 Dec 2025 12:34:12 -0700 Subject: [PATCH 08/15] removing some extra lines for parity --- service/history/workflow/workflow_task_state_machine.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 364d26a1e7a..a3f6ab210ea 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -951,7 +951,6 @@ func (m *workflowTaskStateMachine) failWorkflowTask( // need to retain Build ID of failed WF task to compare it with the build ID of next attempt BuildId: m.ms.executionInfo.WorkflowTaskBuildId, } - if incrementAttempt { failWorkflowTaskInfo.Attempt = m.ms.executionInfo.WorkflowTaskAttempt + 1 failWorkflowTaskInfo.ScheduledTime = m.ms.timeSource.Now().UTC() @@ -960,7 +959,6 @@ func (m *workflowTaskStateMachine) failWorkflowTask( } } m.retainWorkflowTaskBuildIdInfo(failWorkflowTaskInfo) - m.UpdateWorkflowTask(failWorkflowTaskInfo) // Total continuous failures = AttemptsSinceLastSuccess + Attempt @@ -995,7 +993,6 @@ func (m *workflowTaskStateMachine) deleteWorkflowTask() { RequestID: emptyUUID, WorkflowTaskTimeout: time.Duration(0), Attempt: 1, - AttemptsSinceLastSuccess: 0, StartedTime: time.Unix(0, 0).UTC(), ScheduledTime: time.Unix(0, 0).UTC(), From da02cc29ad3d16a91052e1ebecf3fef3a524f5ed Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 9 Dec 2025 12:50:19 -0700 Subject: [PATCH 09/15] formatting --- .../workflow/workflow_task_state_machine.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index a3f6ab210ea..3a79d0eed8a 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -987,14 +987,14 @@ func (m *workflowTaskStateMachine) deleteWorkflowTask() { m.ms.SetWorkflowTaskStartToCloseTimeoutTask(nil) resetWorkflowTaskInfo := &historyi.WorkflowTaskInfo{ - Version: common.EmptyVersion, - ScheduledEventID: common.EmptyEventID, - StartedEventID: common.EmptyEventID, - RequestID: emptyUUID, - WorkflowTaskTimeout: time.Duration(0), - Attempt: 1, - StartedTime: time.Unix(0, 0).UTC(), - ScheduledTime: time.Unix(0, 0).UTC(), + Version: common.EmptyVersion, + ScheduledEventID: common.EmptyEventID, + StartedEventID: common.EmptyEventID, + RequestID: emptyUUID, + WorkflowTaskTimeout: time.Duration(0), + Attempt: 1, + StartedTime: time.Unix(0, 0).UTC(), + ScheduledTime: time.Unix(0, 0).UTC(), TaskQueue: nil, // Keep the last original scheduled Timestamp, so that AddWorkflowTaskScheduledEventAsHeartbeat can continue with it. From 0f82782bb4372257a9d38d9dfd578e7343f7309a Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Wed, 10 Dec 2025 10:56:06 -0700 Subject: [PATCH 10/15] comments --- .../workflow/workflow_task_state_machine.go | 124 +++++++++--------- tests/workflow_task_reported_problems_test.go | 75 +++-------- .../workflow_task_reported_problems_test.go | 80 ++--------- 3 files changed, 91 insertions(+), 188 deletions(-) diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 3a79d0eed8a..cdd507e704e 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -80,21 +80,23 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskScheduledEvent( } } + attemptsSinceLastSuccess := m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess workflowTask := &historyi.WorkflowTaskInfo{ - Version: version, - ScheduledEventID: scheduledEventID, - StartedEventID: common.EmptyEventID, - RequestID: emptyUUID, - WorkflowTaskTimeout: startToCloseTimeout.AsDuration(), - TaskQueue: taskQueue, - Attempt: attempt, - ScheduledTime: scheduledTime.AsTime(), - StartedTime: time.Time{}, - OriginalScheduledTime: originalScheduledTimestamp.AsTime(), - Type: workflowTaskType, - SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started - HistorySizeBytes: 0, // reset, will be recomputed on workflow task started - Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), + Version: version, + ScheduledEventID: scheduledEventID, + StartedEventID: common.EmptyEventID, + RequestID: emptyUUID, + WorkflowTaskTimeout: startToCloseTimeout.AsDuration(), + TaskQueue: taskQueue, + Attempt: attempt, + AttemptsSinceLastSuccess: attemptsSinceLastSuccess, // Preserve failure counter + ScheduledTime: scheduledTime.AsTime(), + StartedTime: time.Time{}, + OriginalScheduledTime: originalScheduledTimestamp.AsTime(), + Type: workflowTaskType, + SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started + HistorySizeBytes: 0, // reset, will be recomputed on workflow task started + Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), } m.retainWorkflowTaskBuildIdInfo(workflowTask) @@ -145,14 +147,15 @@ func (m *workflowTaskStateMachine) ApplyTransientWorkflowTaskScheduled() (*histo WorkflowTaskTimeout: m.ms.GetExecutionInfo().DefaultWorkflowTaskTimeout.AsDuration(), // Task queue is always normal (not sticky) because transient workflow task is created only for // failed/timed out workflow task and fail/timeout clears sticky task queue. - TaskQueue: m.ms.CurrentTaskQueue(), - Attempt: m.ms.GetExecutionInfo().WorkflowTaskAttempt, - ScheduledTime: timestamppb.New(m.ms.timeSource.Now()).AsTime(), - StartedTime: time.Unix(0, 0).UTC(), - Type: enumsspb.WORKFLOW_TASK_TYPE_NORMAL, - SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started - HistorySizeBytes: 0, // reset, will be recomputed on workflow task started - Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), + TaskQueue: m.ms.CurrentTaskQueue(), + Attempt: m.ms.GetExecutionInfo().WorkflowTaskAttempt, + AttemptsSinceLastSuccess: m.ms.GetExecutionInfo().WorkflowTaskAttemptsSinceLastSuccess, + ScheduledTime: timestamppb.New(m.ms.timeSource.Now()).AsTime(), + StartedTime: time.Unix(0, 0).UTC(), + Type: enumsspb.WORKFLOW_TASK_TYPE_NORMAL, + SuggestContinueAsNew: false, // reset, will be recomputed on workflow task started + HistorySizeBytes: 0, // reset, will be recomputed on workflow task started + Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), } m.retainWorkflowTaskBuildIdInfo(workflowTask) @@ -195,21 +198,22 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskStartedEvent( } workflowTask = &historyi.WorkflowTaskInfo{ - Version: version, - ScheduledEventID: scheduledEventID, - StartedEventID: startedEventID, - RequestID: requestID, - WorkflowTaskTimeout: workflowTask.WorkflowTaskTimeout, - Attempt: workflowTask.Attempt, - StartedTime: startedTime, - ScheduledTime: workflowTask.ScheduledTime, - TaskQueue: workflowTask.TaskQueue, - OriginalScheduledTime: workflowTask.OriginalScheduledTime, - Type: workflowTask.Type, - SuggestContinueAsNew: suggestContinueAsNew, - HistorySizeBytes: historySizeBytes, - BuildIdRedirectCounter: redirectCounter, - Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), + Version: version, + ScheduledEventID: scheduledEventID, + StartedEventID: startedEventID, + RequestID: requestID, + WorkflowTaskTimeout: workflowTask.WorkflowTaskTimeout, + Attempt: workflowTask.Attempt, + AttemptsSinceLastSuccess: workflowTask.AttemptsSinceLastSuccess, + StartedTime: startedTime, + ScheduledTime: workflowTask.ScheduledTime, + TaskQueue: workflowTask.TaskQueue, + OriginalScheduledTime: workflowTask.OriginalScheduledTime, + Type: workflowTask.Type, + SuggestContinueAsNew: suggestContinueAsNew, + HistorySizeBytes: historySizeBytes, + BuildIdRedirectCounter: redirectCounter, + Stamp: m.ms.GetExecutionInfo().GetWorkflowTaskStamp(), } if buildId := worker_versioning.BuildIdIfUsingVersioning(versioningStamp); buildId != "" { @@ -319,14 +323,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( // Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for // transient/speculative workflow task and will cause in timeout processing to not work for transient workflow tasks. if m.ms.HasBufferedEvents() { - // When buffered events are applied, preserve the total failure count from the previous workflow task - // before resetting Attempt to 1. This ensures we track continuous failures across workflow task resets. - if m.ms.IsTransientWorkflowTask() { - // Calculate total failures from previous WFT: AttemptsSinceLastSuccess + Attempt - // This becomes the new AttemptsSinceLastSuccess for the next WFT - previousTotalFailures := m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + m.ms.executionInfo.WorkflowTaskAttempt - m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = previousTotalFailures - } + // Reset Attempt to 1 when flushing buffered events to create a new non-transient workflow task. + // WorkflowTaskAttemptsSinceLastSuccess is already up-to-date from previous failures. m.ms.executionInfo.WorkflowTaskAttempt = 1 workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL createWorkflowTaskScheduledEvent = true @@ -932,8 +930,21 @@ func (m *workflowTaskStateMachine) failWorkflowTask( m.ms.ClearStickyTaskQueue() } - // AttemptsSinceLastSuccess is the carried-over attempt count from the previous workflow task. - // It stays the same across failures unless buffered events reset the Attempt counter. + // Update AttemptsSinceLastSuccess on every failure to keep the counter up-to-date. + // Increment it by 1 for each failure instead of only when flushing buffered events. + newAttemptsSinceLastSuccess := m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + newAttempt := int32(1) + + if incrementAttempt { + // Increment the failure counter for this WFT failure + newAttemptsSinceLastSuccess = m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + 1 + // Also increment Attempt for transient workflow task tracking + newAttempt = m.ms.executionInfo.WorkflowTaskAttempt + 1 + if m.ms.config.EnableWorkflowTaskStampIncrementOnFailure() { + m.ms.executionInfo.WorkflowTaskStamp += 1 + } + } + failWorkflowTaskInfo := &historyi.WorkflowTaskInfo{ Version: common.EmptyVersion, ScheduledEventID: common.EmptyEventID, @@ -943,29 +954,20 @@ func (m *workflowTaskStateMachine) failWorkflowTask( StartedTime: time.Unix(0, 0).UTC(), TaskQueue: nil, OriginalScheduledTime: time.Unix(0, 0).UTC(), - Attempt: 1, - AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess, + Attempt: newAttempt, + AttemptsSinceLastSuccess: newAttemptsSinceLastSuccess, Type: enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED, SuggestContinueAsNew: false, HistorySizeBytes: 0, // need to retain Build ID of failed WF task to compare it with the build ID of next attempt BuildId: m.ms.executionInfo.WorkflowTaskBuildId, } - if incrementAttempt { - failWorkflowTaskInfo.Attempt = m.ms.executionInfo.WorkflowTaskAttempt + 1 - failWorkflowTaskInfo.ScheduledTime = m.ms.timeSource.Now().UTC() - if m.ms.config.EnableWorkflowTaskStampIncrementOnFailure() { - m.ms.executionInfo.WorkflowTaskStamp += 1 - } - } m.retainWorkflowTaskBuildIdInfo(failWorkflowTaskInfo) m.UpdateWorkflowTask(failWorkflowTaskInfo) - // Total continuous failures = AttemptsSinceLastSuccess + Attempt - // AttemptsSinceLastSuccess contains the sum of all previous workflow task failures. - // Attempt is the current workflow task's attempt number (>= 1). - // When this function is called, we're recording a failure, so Attempt represents at least one attempt. - totalContinuousFailures := failWorkflowTaskInfo.AttemptsSinceLastSuccess + failWorkflowTaskInfo.Attempt + // AttemptsSinceLastSuccess now directly tracks the total number of workflow task failures. + // It's incremented on every failure, so we can use it directly. + totalContinuousFailures := failWorkflowTaskInfo.AttemptsSinceLastSuccess consecutiveFailuresRequired := m.ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(m.ms.GetNamespaceEntry().Name().String()) if consecutiveFailuresRequired > 0 && totalContinuousFailures >= int32(consecutiveFailuresRequired) { diff --git a/tests/workflow_task_reported_problems_test.go b/tests/workflow_task_reported_problems_test.go index 62c922088a5..0fb3a98ed85 100644 --- a/tests/workflow_task_reported_problems_test.go +++ b/tests/workflow_task_reported_problems_test.go @@ -19,10 +19,7 @@ import ( type WFTFailureReportedProblemsTestSuite struct { testcore.FunctionalTestBase - shouldFail atomic.Bool - failureCount atomic.Int32 - failureType atomic.Int32 // 0 = panic, 1 = non-deterministic error - stopSignals atomic.Bool + shouldFail atomic.Bool } func TestWFTFailureReportedProblemsTestSuite(t *testing.T) { @@ -32,7 +29,7 @@ func TestWFTFailureReportedProblemsTestSuite(t *testing.T) { func (s *WFTFailureReportedProblemsTestSuite) SetupTest() { s.FunctionalTestBase.SetupTest() - s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute, 3) + s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute, 2) } func (s *WFTFailureReportedProblemsTestSuite) simpleWorkflowWithShouldFail(ctx workflow.Context) (string, error) { @@ -50,14 +47,12 @@ func (s *WFTFailureReportedProblemsTestSuite) simpleActivity() (string, error) { // This is used to test that the TemporalReportedProblems search attribute is not incorrectly removed // when signals keep coming in despite continuous workflow task failures. func (s *WFTFailureReportedProblemsTestSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) { - signalChan := workflow.GetSignalChannel(ctx, "test-signal") - - var signalValue string - signalChan.Receive(ctx, &signalValue) - - // Always fail after receiving a signal, unless shouldFail is false + // If we should fail, signal ourselves (creating a side effect) and immediately panic. + // This will create buffered. if s.shouldFail.Load() { - panic("forced-panic-after-signal") + // Signal ourselves to create buffered events + _ = workflow.SignalExternalWorkflow(ctx, workflow.GetInfo(ctx).WorkflowExecution.ID, "", "test-signal", "self-signal") + panic("forced-panic-after-self-signal") } // If we reach here, shouldFail is false, so we can complete @@ -130,11 +125,10 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Set } func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_NotClearedBySignals() { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() s.shouldFail.Store(true) - s.stopSignals.Store(false) s.Worker().RegisterWorkflow(s.workflowWithSignalsThatFails) @@ -146,26 +140,7 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Not workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, s.workflowWithSignalsThatFails) s.NoError(err) - // Start sending signals every second in a goroutine - signalDone := make(chan struct{}) - go func() { - defer close(signalDone) - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if s.stopSignals.Load() { - return - } - _ = s.SdkClient().SignalWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test-signal", "ping") - case <-ctx.Done(): - return - } - } - }() - + // The workflow will signal itself and panic on each WFT, creating buffered events naturally. // Wait for the search attribute to be set due to consecutive failures s.EventuallyWithT(func(t *assert.CollectT) { description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) @@ -175,39 +150,21 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Not require.NotEmpty(t, saVal) require.Contains(t, saVal, "category=WorkflowTaskFailed") require.Contains(t, saVal, "cause=WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure") - }, 30*time.Second, 500*time.Millisecond) - - // Continue sending signals for a few more seconds and verify the search attribute is NOT removed - // This is the key part of the test - signals should not cause the search attribute to be cleared + }, 20*time.Second, 500*time.Millisecond) + // Verify the search attribute persists even as the workflow continues to fail and create buffered events + // This is the key part of the test - buffered events should not cause the search attribute to be cleared s.EventuallyWithT(func(t *assert.CollectT) { description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) s.NoError(err) saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) - s.True(ok, "Search attribute should still be present after receiving signals") - s.NotEmpty(saVal, "Search attribute should not be empty after receiving signals") + s.True(ok, "Search attribute should still be present during continued failures") + s.NotEmpty(saVal, "Search attribute should not be empty during continued failures") }, 5*time.Second, 500*time.Millisecond) - // Stop signals and unblock the workflow for cleanup - s.stopSignals.Store(true) - s.shouldFail.Store(false) - - // Send one final signal to trigger workflow completion - err = s.SdkClient().SignalWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test-signal", "final") - s.NoError(err) - - var out string - s.NoError(workflowRun.Get(ctx, &out)) - - // Wait for signal goroutine to finish - <-signalDone - - // Verify search attribute is cleared after successful completion - description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + // Terminate the workflow for cleanup + err = s.SdkClient().TerminateWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test cleanup") s.NoError(err) - s.NotNil(description.TypedSearchAttributes) - _, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) - s.False(ok) } func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_SetAndClear_FailAfterActivity() { diff --git a/tests/xdc/workflow_task_reported_problems_test.go b/tests/xdc/workflow_task_reported_problems_test.go index 21507fe8df5..ff49bf67e6e 100644 --- a/tests/xdc/workflow_task_reported_problems_test.go +++ b/tests/xdc/workflow_task_reported_problems_test.go @@ -27,8 +27,7 @@ import ( type ( WorkflowTaskReportedProblemsReplicationSuite struct { xdcBaseSuite - shouldFail atomic.Bool - stopSignals atomic.Bool + shouldFail atomic.Bool } ) @@ -308,14 +307,12 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) simpleActivity() (string, // workflowWithSignalsThatFails creates a workflow that listens for signals and fails on each workflow task. func (s *WorkflowTaskReportedProblemsReplicationSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) { - signalChan := workflow.GetSignalChannel(ctx, "test-signal") - - var signalValue string - signalChan.Receive(ctx, &signalValue) - - // Always fail after receiving a signal, unless shouldFail is false + // If we should fail, signal ourselves (creating a side effect) and immediately panic. + // This creates buffered events without needing external goroutines. if s.shouldFail.Load() { - panic("forced-panic-after-signal") + // Signal ourselves to create buffered events + _ = workflow.SignalExternalWorkflow(ctx, workflow.GetInfo(ctx).WorkflowExecution.ID, "", "test-signal", "self-signal") + panic("forced-panic-after-self-signal") } // If we reach here, shouldFail is false, so we can complete @@ -352,7 +349,6 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro s.NoError(err) s.shouldFail.Store(true) - s.stopSignals.Store(false) taskQueue := testcore.RandomizeStr("tq") worker1 := sdkworker.New(activeSDKClient, taskQueue, sdkworker.Options{}) @@ -368,26 +364,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro workflowRun, err := activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.workflowWithSignalsThatFails) s.NoError(err) - // Start sending signals every second in a goroutine - signalDone := make(chan struct{}) - go func() { - defer close(signalDone) - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if s.stopSignals.Load() { - return - } - _ = activeSDKClient.SignalWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test-signal", "ping") - case <-ctx.Done(): - return - } - } - }() - + // The workflow will signal itself and panic on each WFT, creating buffered events naturally. // Wait for the search attribute to be set due to consecutive failures s.checkReportedProblemsSearchAttribute( s.clusters[0].Host().AdminClient(), @@ -400,13 +377,13 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro true, ) - // Continue sending signals for a few more seconds and verify the search attribute is NOT removed + // Verify the search attribute persists even as the workflow continues to fail and create buffered events s.EventuallyWithT(func(t *assert.CollectT) { description, err := activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) s.NoError(err) saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) - s.True(ok, "Search attribute should still be present after receiving signals") - s.NotEmpty(saVal, "Search attribute should not be empty after receiving signals") + s.True(ok, "Search attribute should still be present during continued failures") + s.NotEmpty(saVal, "Search attribute should not be empty during continued failures") }, 5*time.Second, 500*time.Millisecond) // get standby client @@ -429,42 +406,9 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro true, ) - // Stop signals and unblock the workflow for cleanup - s.stopSignals.Store(true) - s.shouldFail.Store(false) - - // Send one final signal to trigger workflow completion - err = activeSDKClient.SignalWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test-signal", "final") + // Terminate the workflow for cleanup + err = activeSDKClient.TerminateWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test cleanup") s.NoError(err) - - var out string - s.NoError(workflowRun.Get(ctx, &out)) - - // Wait for signal goroutine to finish - <-signalDone - - // Verify search attribute is cleared after successful completion in both clusters - s.checkReportedProblemsSearchAttribute( - s.clusters[0].Host().AdminClient(), - activeSDKClient, - workflowRun.GetID(), - workflowRun.GetRunID(), - ns, - "", - "", - false, - ) - - s.checkReportedProblemsSearchAttribute( - s.clusters[1].Host().AdminClient(), - standbyClient, - workflowRun.GetID(), - workflowRun.GetRunID(), - ns, - "", - "", - false, - ) } func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedProblems_SetAndClear_FailAfterActivity() { From 3f7a1656d52a2f19c1cdce9643874152569bd623 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Wed, 10 Dec 2025 11:05:19 -0700 Subject: [PATCH 11/15] xdc test --- .../workflow_task_reported_problems_test.go | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/tests/xdc/workflow_task_reported_problems_test.go b/tests/xdc/workflow_task_reported_problems_test.go index ff49bf67e6e..97e9923fa24 100644 --- a/tests/xdc/workflow_task_reported_problems_test.go +++ b/tests/xdc/workflow_task_reported_problems_test.go @@ -197,6 +197,95 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro ) } +func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedProblems_SetAndClear() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ns := s.createGlobalNamespace() + activeSDKClient, err := sdkclient.Dial(sdkclient.Options{ + HostPort: s.clusters[0].Host().FrontendGRPCAddress(), + Namespace: ns, + Logger: log.NewSdkLogger(s.logger), + }) + s.NoError(err) + + standbyClient, err := sdkclient.Dial(sdkclient.Options{ + HostPort: s.clusters[1].Host().FrontendGRPCAddress(), + Namespace: ns, + Logger: log.NewSdkLogger(s.logger), + }) + s.NoError(err) + + s.shouldFail.Store(true) + + taskQueue := testcore.RandomizeStr("tq") + worker1 := sdkworker.New(activeSDKClient, taskQueue, sdkworker.Options{}) + worker1.RegisterWorkflow(s.simpleWorkflow) + s.NoError(worker1.Start()) + defer worker1.Stop() + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wfid-" + s.T().Name()), + TaskQueue: taskQueue, + } + + workflowRun, err := activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.simpleWorkflow) + s.NoError(err) + + // Check if the search attributes are not empty and has TemporalReportedProblems + s.checkReportedProblemsSearchAttribute( + s.clusters[0].Host().AdminClient(), + activeSDKClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "WorkflowTaskFailed", + "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", + true, + ) + + // Verify the search attribute is replicated to the standby cluster + s.checkReportedProblemsSearchAttribute( + s.clusters[1].Host().AdminClient(), + standbyClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "WorkflowTaskFailed", + "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", + true, + ) + + // Unblock the workflow + s.shouldFail.Store(false) + + var out string + s.NoError(workflowRun.Get(ctx, &out)) + + // Verify search attribute is cleared after successful completion in both clusters + s.checkReportedProblemsSearchAttribute( + s.clusters[0].Host().AdminClient(), + activeSDKClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "", + "", + false, + ) + + s.checkReportedProblemsSearchAttribute( + s.clusters[1].Host().AdminClient(), + standbyClient, + workflowRun.GetID(), + workflowRun.GetRunID(), + ns, + "", + "", + false, + ) +} + func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedProblems_DynamicConfigChanges() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() From 1aa46227a8ce337632c3c41627554450fdf696b8 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Wed, 10 Dec 2025 15:08:33 -0700 Subject: [PATCH 12/15] self review --- .../workflow/workflow_task_state_machine.go | 11 +- .../workflow_task_reported_problems_test.go | 203 +----------------- 2 files changed, 2 insertions(+), 212 deletions(-) diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index cdd507e704e..1cd53bbb005 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -80,7 +80,6 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskScheduledEvent( } } - attemptsSinceLastSuccess := m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess workflowTask := &historyi.WorkflowTaskInfo{ Version: version, ScheduledEventID: scheduledEventID, @@ -89,7 +88,7 @@ func (m *workflowTaskStateMachine) ApplyWorkflowTaskScheduledEvent( WorkflowTaskTimeout: startToCloseTimeout.AsDuration(), TaskQueue: taskQueue, Attempt: attempt, - AttemptsSinceLastSuccess: attemptsSinceLastSuccess, // Preserve failure counter + AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess, ScheduledTime: scheduledTime.AsTime(), StartedTime: time.Time{}, OriginalScheduledTime: originalScheduledTimestamp.AsTime(), @@ -323,8 +322,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( // Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for // transient/speculative workflow task and will cause in timeout processing to not work for transient workflow tasks. if m.ms.HasBufferedEvents() { - // Reset Attempt to 1 when flushing buffered events to create a new non-transient workflow task. - // WorkflowTaskAttemptsSinceLastSuccess is already up-to-date from previous failures. m.ms.executionInfo.WorkflowTaskAttempt = 1 workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL createWorkflowTaskScheduledEvent = true @@ -930,11 +927,8 @@ func (m *workflowTaskStateMachine) failWorkflowTask( m.ms.ClearStickyTaskQueue() } - // Update AttemptsSinceLastSuccess on every failure to keep the counter up-to-date. - // Increment it by 1 for each failure instead of only when flushing buffered events. newAttemptsSinceLastSuccess := m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess newAttempt := int32(1) - if incrementAttempt { // Increment the failure counter for this WFT failure newAttemptsSinceLastSuccess = m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + 1 @@ -965,11 +959,8 @@ func (m *workflowTaskStateMachine) failWorkflowTask( m.retainWorkflowTaskBuildIdInfo(failWorkflowTaskInfo) m.UpdateWorkflowTask(failWorkflowTaskInfo) - // AttemptsSinceLastSuccess now directly tracks the total number of workflow task failures. - // It's incremented on every failure, so we can use it directly. totalContinuousFailures := failWorkflowTaskInfo.AttemptsSinceLastSuccess consecutiveFailuresRequired := m.ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(m.ms.GetNamespaceEntry().Name().String()) - if consecutiveFailuresRequired > 0 && totalContinuousFailures >= int32(consecutiveFailuresRequired) { if err := m.ms.UpdateReportedProblemsSearchAttribute(); err != nil { return err diff --git a/tests/xdc/workflow_task_reported_problems_test.go b/tests/xdc/workflow_task_reported_problems_test.go index 97e9923fa24..a3bb6a03be7 100644 --- a/tests/xdc/workflow_task_reported_problems_test.go +++ b/tests/xdc/workflow_task_reported_problems_test.go @@ -197,95 +197,6 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro ) } -func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedProblems_SetAndClear() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - ns := s.createGlobalNamespace() - activeSDKClient, err := sdkclient.Dial(sdkclient.Options{ - HostPort: s.clusters[0].Host().FrontendGRPCAddress(), - Namespace: ns, - Logger: log.NewSdkLogger(s.logger), - }) - s.NoError(err) - - standbyClient, err := sdkclient.Dial(sdkclient.Options{ - HostPort: s.clusters[1].Host().FrontendGRPCAddress(), - Namespace: ns, - Logger: log.NewSdkLogger(s.logger), - }) - s.NoError(err) - - s.shouldFail.Store(true) - - taskQueue := testcore.RandomizeStr("tq") - worker1 := sdkworker.New(activeSDKClient, taskQueue, sdkworker.Options{}) - worker1.RegisterWorkflow(s.simpleWorkflow) - s.NoError(worker1.Start()) - defer worker1.Stop() - - workflowOptions := sdkclient.StartWorkflowOptions{ - ID: testcore.RandomizeStr("wfid-" + s.T().Name()), - TaskQueue: taskQueue, - } - - workflowRun, err := activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.simpleWorkflow) - s.NoError(err) - - // Check if the search attributes are not empty and has TemporalReportedProblems - s.checkReportedProblemsSearchAttribute( - s.clusters[0].Host().AdminClient(), - activeSDKClient, - workflowRun.GetID(), - workflowRun.GetRunID(), - ns, - "WorkflowTaskFailed", - "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", - true, - ) - - // Verify the search attribute is replicated to the standby cluster - s.checkReportedProblemsSearchAttribute( - s.clusters[1].Host().AdminClient(), - standbyClient, - workflowRun.GetID(), - workflowRun.GetRunID(), - ns, - "WorkflowTaskFailed", - "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", - true, - ) - - // Unblock the workflow - s.shouldFail.Store(false) - - var out string - s.NoError(workflowRun.Get(ctx, &out)) - - // Verify search attribute is cleared after successful completion in both clusters - s.checkReportedProblemsSearchAttribute( - s.clusters[0].Host().AdminClient(), - activeSDKClient, - workflowRun.GetID(), - workflowRun.GetRunID(), - ns, - "", - "", - false, - ) - - s.checkReportedProblemsSearchAttribute( - s.clusters[1].Host().AdminClient(), - standbyClient, - workflowRun.GetID(), - workflowRun.GetRunID(), - ns, - "", - "", - false, - ) -} - func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedProblems_DynamicConfigChanges() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -390,14 +301,10 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro ) } -func (s *WorkflowTaskReportedProblemsReplicationSuite) simpleActivity() (string, error) { - return "done!", nil -} - // workflowWithSignalsThatFails creates a workflow that listens for signals and fails on each workflow task. func (s *WorkflowTaskReportedProblemsReplicationSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) { // If we should fail, signal ourselves (creating a side effect) and immediately panic. - // This creates buffered events without needing external goroutines. + // This creates buffered events. if s.shouldFail.Load() { // Signal ourselves to create buffered events _ = workflow.SignalExternalWorkflow(ctx, workflow.GetInfo(ctx).WorkflowExecution.ID, "", "test-signal", "self-signal") @@ -408,23 +315,6 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) workflowWithSignalsThatFa return "done!", nil } -// workflowWithActivity creates a workflow that executes an activity before potentially failing. -func (s *WorkflowTaskReportedProblemsReplicationSuite) workflowWithActivity(ctx workflow.Context) (string, error) { - var ret string - err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 1 * time.Second, - }), s.simpleActivity).Get(ctx, &ret) - if err != nil { - return "", err - } - - if s.shouldFail.Load() { - panic("forced-panic-to-fail-wft") - } - - return "done!", nil -} - func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedProblems_NotClearedBySignals() { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() @@ -499,94 +389,3 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro err = activeSDKClient.TerminateWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test cleanup") s.NoError(err) } - -func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedProblems_SetAndClear_FailAfterActivity() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - ns := s.createGlobalNamespace() - activeSDKClient, err := sdkclient.Dial(sdkclient.Options{ - HostPort: s.clusters[0].Host().FrontendGRPCAddress(), - Namespace: ns, - Logger: log.NewSdkLogger(s.logger), - }) - s.NoError(err) - - s.shouldFail.Store(true) - - taskQueue := testcore.RandomizeStr("tq") - worker1 := sdkworker.New(activeSDKClient, taskQueue, sdkworker.Options{}) - worker1.RegisterWorkflow(s.workflowWithActivity) - worker1.RegisterActivity(s.simpleActivity) - s.NoError(worker1.Start()) - defer worker1.Stop() - - workflowOptions := sdkclient.StartWorkflowOptions{ - ID: testcore.RandomizeStr("wfid-" + s.T().Name()), - TaskQueue: taskQueue, - } - - workflowRun, err := activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.workflowWithActivity) - s.NoError(err) - - // Validate the search attributes are set after activity completes - s.checkReportedProblemsSearchAttribute( - s.clusters[0].Host().AdminClient(), - activeSDKClient, - workflowRun.GetID(), - workflowRun.GetRunID(), - ns, - "WorkflowTaskFailed", - "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", - true, - ) - - // get standby client - standbyClient, err := sdkclient.Dial(sdkclient.Options{ - HostPort: s.clusters[1].Host().FrontendGRPCAddress(), - Namespace: ns, - }) - s.NoError(err) - s.NotNil(standbyClient) - - // verify search attributes are replicated to cluster1 - s.checkReportedProblemsSearchAttribute( - s.clusters[1].Host().AdminClient(), - standbyClient, - workflowRun.GetID(), - workflowRun.GetRunID(), - ns, - "WorkflowTaskFailed", - "WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure", - true, - ) - - // Unblock the workflow - s.shouldFail.Store(false) - - var out string - s.NoError(workflowRun.Get(ctx, &out)) - - // Verify search attribute is cleared after successful completion in both clusters - s.checkReportedProblemsSearchAttribute( - s.clusters[0].Host().AdminClient(), - activeSDKClient, - workflowRun.GetID(), - workflowRun.GetRunID(), - ns, - "", - "", - false, - ) - - s.checkReportedProblemsSearchAttribute( - s.clusters[1].Host().AdminClient(), - standbyClient, - workflowRun.GetID(), - workflowRun.GetRunID(), - ns, - "", - "", - false, - ) -} From fc43e1cb9b94d6d6bb4a6ffafa35b36c8f7ad184 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Wed, 10 Dec 2025 15:37:34 -0700 Subject: [PATCH 13/15] simplification --- service/history/workflow/workflow_task_state_machine.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 1cd53bbb005..023cb5b9ebe 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -959,9 +959,8 @@ func (m *workflowTaskStateMachine) failWorkflowTask( m.retainWorkflowTaskBuildIdInfo(failWorkflowTaskInfo) m.UpdateWorkflowTask(failWorkflowTaskInfo) - totalContinuousFailures := failWorkflowTaskInfo.AttemptsSinceLastSuccess consecutiveFailuresRequired := m.ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(m.ms.GetNamespaceEntry().Name().String()) - if consecutiveFailuresRequired > 0 && totalContinuousFailures >= int32(consecutiveFailuresRequired) { + if consecutiveFailuresRequired > 0 && failWorkflowTaskInfo.AttemptsSinceLastSuccess >= int32(consecutiveFailuresRequired) { if err := m.ms.UpdateReportedProblemsSearchAttribute(); err != nil { return err } From 7a21d8718310bf9568aff868f403737eccf45c7d Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Thu, 11 Dec 2025 15:12:21 -0700 Subject: [PATCH 14/15] use SignalWorkflow --- tests/workflow_task_reported_problems_test.go | 5 +- .../workflow_task_reported_problems_test.go | 54 ++++++++++--------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/tests/workflow_task_reported_problems_test.go b/tests/workflow_task_reported_problems_test.go index 0fb3a98ed85..d1c968f6ea1 100644 --- a/tests/workflow_task_reported_problems_test.go +++ b/tests/workflow_task_reported_problems_test.go @@ -51,7 +51,10 @@ func (s *WFTFailureReportedProblemsTestSuite) workflowWithSignalsThatFails(ctx w // This will create buffered. if s.shouldFail.Load() { // Signal ourselves to create buffered events - _ = workflow.SignalExternalWorkflow(ctx, workflow.GetInfo(ctx).WorkflowExecution.ID, "", "test-signal", "self-signal") + err := s.SdkClient().SignalWorkflow(context.Background(), workflow.GetInfo(ctx).WorkflowExecution.ID, "", "test-signal", "self-signal") + if err != nil { + return "", err + } panic("forced-panic-after-self-signal") } diff --git a/tests/xdc/workflow_task_reported_problems_test.go b/tests/xdc/workflow_task_reported_problems_test.go index a3bb6a03be7..821fd922f21 100644 --- a/tests/xdc/workflow_task_reported_problems_test.go +++ b/tests/xdc/workflow_task_reported_problems_test.go @@ -27,7 +27,9 @@ import ( type ( WorkflowTaskReportedProblemsReplicationSuite struct { xdcBaseSuite - shouldFail atomic.Bool + shouldFail atomic.Bool + activeSDKClient sdkclient.Client + standbySDKClient sdkclient.Client } ) @@ -202,7 +204,8 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro defer cancel() ns := s.createGlobalNamespace() - activeSDKClient, err := sdkclient.Dial(sdkclient.Options{ + var err error + s.activeSDKClient, err = sdkclient.Dial(sdkclient.Options{ HostPort: s.clusters[0].Host().FrontendGRPCAddress(), Namespace: ns, Logger: log.NewSdkLogger(s.logger), @@ -210,7 +213,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro s.NoError(err) taskQueue := testcore.RandomizeStr("tq") - worker1 := sdkworker.New(activeSDKClient, taskQueue, sdkworker.Options{}) + worker1 := sdkworker.New(s.activeSDKClient, taskQueue, sdkworker.Options{}) worker1.RegisterWorkflow(s.simpleWorkflow) @@ -227,12 +230,12 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro TaskQueue: taskQueue, } - workflowRun, err := activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.simpleWorkflow) + workflowRun, err := s.activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.simpleWorkflow) s.NoError(err) // Verify search attributes are NOT set in cluster0 when config is 0 s.EventuallyWithT(func(t *assert.CollectT) { - description, err := activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + description, err := s.activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) require.NoError(t, err) _, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) require.False(t, ok) @@ -240,7 +243,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro // Verify workflow task attempts are accumulating s.EventuallyWithT(func(t *assert.CollectT) { - exec, err := activeSDKClient.DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + exec, err := s.activeSDKClient.DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) require.NoError(t, err) require.GreaterOrEqual(t, exec.PendingWorkflowTask.Attempt, int32(2)) }, 5*time.Second, 500*time.Millisecond) @@ -251,7 +254,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro // Verify search attributes are now set in cluster0 s.EventuallyWithT(func(t *assert.CollectT) { - description, err := activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + description, err := s.activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) require.NoError(t, err) saValues, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) require.True(t, ok) @@ -262,17 +265,17 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro }, 15*time.Second, 500*time.Millisecond) // get standby client - standbyClient, err := sdkclient.Dial(sdkclient.Options{ + s.standbySDKClient, err = sdkclient.Dial(sdkclient.Options{ HostPort: s.clusters[1].Host().FrontendGRPCAddress(), Namespace: ns, }) s.NoError(err) - s.NotNil(standbyClient) + s.NotNil(s.standbySDKClient) // verify search attributes are replicated to cluster1 using the helper function s.checkReportedProblemsSearchAttribute( s.clusters[1].Host().AdminClient(), - standbyClient, + s.standbySDKClient, workflowRun.GetID(), workflowRun.GetRunID(), ns, @@ -291,7 +294,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro // verify search attributes are cleared in cluster1 using the helper function s.checkReportedProblemsSearchAttribute( s.clusters[1].Host().AdminClient(), - standbyClient, + s.standbySDKClient, workflowRun.GetID(), workflowRun.GetRunID(), ns, @@ -302,12 +305,14 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro } // workflowWithSignalsThatFails creates a workflow that listens for signals and fails on each workflow task. -func (s *WorkflowTaskReportedProblemsReplicationSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) { - // If we should fail, signal ourselves (creating a side effect) and immediately panic. - // This creates buffered events. +func (s *WorkflowTaskReportedProblemsReplicationSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) { // If we should fail, signal ourselves (creating a side effect) and immediately panic. + // This will create buffered. if s.shouldFail.Load() { // Signal ourselves to create buffered events - _ = workflow.SignalExternalWorkflow(ctx, workflow.GetInfo(ctx).WorkflowExecution.ID, "", "test-signal", "self-signal") + err := s.activeSDKClient.SignalWorkflow(context.Background(), workflow.GetInfo(ctx).WorkflowExecution.ID, "", "test-signal", "self-signal") + if err != nil { + return "", err + } panic("forced-panic-after-self-signal") } @@ -320,7 +325,8 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro defer cancel() ns := s.createGlobalNamespace() - activeSDKClient, err := sdkclient.Dial(sdkclient.Options{ + var err error + s.activeSDKClient, err = sdkclient.Dial(sdkclient.Options{ HostPort: s.clusters[0].Host().FrontendGRPCAddress(), Namespace: ns, Logger: log.NewSdkLogger(s.logger), @@ -330,7 +336,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro s.shouldFail.Store(true) taskQueue := testcore.RandomizeStr("tq") - worker1 := sdkworker.New(activeSDKClient, taskQueue, sdkworker.Options{}) + worker1 := sdkworker.New(s.activeSDKClient, taskQueue, sdkworker.Options{}) worker1.RegisterWorkflow(s.workflowWithSignalsThatFails) s.NoError(worker1.Start()) defer worker1.Stop() @@ -340,14 +346,14 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro TaskQueue: taskQueue, } - workflowRun, err := activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.workflowWithSignalsThatFails) + workflowRun, err := s.activeSDKClient.ExecuteWorkflow(ctx, workflowOptions, s.workflowWithSignalsThatFails) s.NoError(err) // The workflow will signal itself and panic on each WFT, creating buffered events naturally. // Wait for the search attribute to be set due to consecutive failures s.checkReportedProblemsSearchAttribute( s.clusters[0].Host().AdminClient(), - activeSDKClient, + s.activeSDKClient, workflowRun.GetID(), workflowRun.GetRunID(), ns, @@ -358,7 +364,7 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro // Verify the search attribute persists even as the workflow continues to fail and create buffered events s.EventuallyWithT(func(t *assert.CollectT) { - description, err := activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + description, err := s.activeSDKClient.DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) s.NoError(err) saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) s.True(ok, "Search attribute should still be present during continued failures") @@ -366,17 +372,17 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro }, 5*time.Second, 500*time.Millisecond) // get standby client - standbyClient, err := sdkclient.Dial(sdkclient.Options{ + s.standbySDKClient, err = sdkclient.Dial(sdkclient.Options{ HostPort: s.clusters[1].Host().FrontendGRPCAddress(), Namespace: ns, }) s.NoError(err) - s.NotNil(standbyClient) + s.NotNil(s.standbySDKClient) // verify search attributes are replicated to cluster1 s.checkReportedProblemsSearchAttribute( s.clusters[1].Host().AdminClient(), - standbyClient, + s.standbySDKClient, workflowRun.GetID(), workflowRun.GetRunID(), ns, @@ -386,6 +392,6 @@ func (s *WorkflowTaskReportedProblemsReplicationSuite) TestWFTFailureReportedPro ) // Terminate the workflow for cleanup - err = activeSDKClient.TerminateWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test cleanup") + err = s.activeSDKClient.TerminateWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test cleanup") s.NoError(err) } From 65a0cfb9b0d629133e9ff235f2393b5024702873 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Thu, 11 Dec 2025 16:15:59 -0700 Subject: [PATCH 15/15] fixing broken unit test --- chasm/lib/callback/executors_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chasm/lib/callback/executors_test.go b/chasm/lib/callback/executors_test.go index 2c95d4a8160..9bb856b3df2 100644 --- a/chasm/lib/callback/executors_test.go +++ b/chasm/lib/callback/executors_test.go @@ -184,7 +184,7 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { } chasmRegistry := chasm.NewRegistry(logger) - err := chasmRegistry.Register(&Library{ + err = chasmRegistry.Register(&Library{ InvocationTaskExecutor: executor, }) require.NoError(t, err) @@ -587,7 +587,7 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) { } chasmRegistry := chasm.NewRegistry(logger) - err := chasmRegistry.Register(&Library{ + err = chasmRegistry.Register(&Library{ InvocationTaskExecutor: executor, }) require.NoError(t, err)