Skip to content
27 changes: 21 additions & 6 deletions api/persistence/v1/executions.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ message WorkflowExecutionInfo {
// It increases monotonically when the workflow's options are modified.
// It is used to check if a workflow task is still relevant to the corresponding workflow state machine.
int32 workflow_task_stamp = 109;
// AttemptsSinceLastSuccess tracks the number of workflow task attempts since the last successful workflow task.
// This is carried over when buffered events are applied after workflow task failures.
// Used by the TemporalReportedProblems search attribute to track continuous failure count.
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: "since" is needed here. --)
int32 workflow_task_attempts_since_last_success = 110;

bool cancel_requested = 29;
string cancel_request_id = 32;
Expand Down
11 changes: 10 additions & 1 deletion service/history/interfaces/workflow_task_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ type WorkflowTaskInfo struct {
WorkflowTaskTimeout time.Duration
// This is only needed to communicate task queue used after AddWorkflowTaskScheduledEvent.
TaskQueue *taskqueuepb.TaskQueue
Attempt int32

// Attempt is the number of attempts for this workflow task.
Attempt int32
// AttemptsSinceLastSuccess is the number of attempts since the last successful workflow task.
// This is used by the `TemporalReportedProblems` search attribute to check latest WFT failure count,
// this will only differ from attempts above when the previous workflow tasks failed and there was a
// buffered event (like a signal or activity finishing) applied to the workflow which causes the
// new workflow task to have an attempt of 1 again.
AttemptsSinceLastSuccess int32

// Scheduled and Started timestamps are useful for transient workflow task: when transient workflow task finally completes,
// use these Timestamp to create scheduled/started events.
// Also used for recording latency metrics
Expand Down
19 changes: 10 additions & 9 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8444,15 +8444,16 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx
var workflowTaskVersionUpdated bool
if transitionhistory.Compare(current.WorkflowTaskLastUpdateVersionedTransition, incoming.WorkflowTaskLastUpdateVersionedTransition) != 0 {
ms.workflowTaskManager.UpdateWorkflowTask(&historyi.WorkflowTaskInfo{
Version: incoming.WorkflowTaskVersion,
ScheduledEventID: incoming.WorkflowTaskScheduledEventId,
StartedEventID: incoming.WorkflowTaskStartedEventId,
RequestID: incoming.WorkflowTaskRequestId,
WorkflowTaskTimeout: incoming.WorkflowTaskTimeout.AsDuration(),
Attempt: incoming.WorkflowTaskAttempt,
Stamp: incoming.WorkflowTaskStamp,
StartedTime: incoming.WorkflowTaskStartedTime.AsTime(),
ScheduledTime: incoming.WorkflowTaskScheduledTime.AsTime(),
Version: incoming.WorkflowTaskVersion,
ScheduledEventID: incoming.WorkflowTaskScheduledEventId,
StartedEventID: incoming.WorkflowTaskStartedEventId,
RequestID: incoming.WorkflowTaskRequestId,
WorkflowTaskTimeout: incoming.WorkflowTaskTimeout.AsDuration(),
Attempt: incoming.WorkflowTaskAttempt,
AttemptsSinceLastSuccess: incoming.WorkflowTaskAttemptsSinceLastSuccess,
Stamp: incoming.WorkflowTaskStamp,
StartedTime: incoming.WorkflowTaskStartedTime.AsTime(),
ScheduledTime: incoming.WorkflowTaskScheduledTime.AsTime(),

OriginalScheduledTime: incoming.WorkflowTaskOriginalScheduledTime.AsTime(),
Type: incoming.WorkflowTaskType,
Expand Down
45 changes: 32 additions & 13 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,14 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
// Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for
// transient/speculative workflow task and will cause in timeout processing to not work for transient workflow tasks.
if m.ms.HasBufferedEvents() {
// When buffered events are applied, preserve the total failure count from the previous workflow task
// before resetting Attempt to 1. This ensures we track continuous failures across workflow task resets.
if m.ms.IsTransientWorkflowTask() {
// Calculate total failures from previous WFT: AttemptsSinceLastSuccess + Attempt
// This becomes the new AttemptsSinceLastSuccess for the next WFT
previousTotalFailures := m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess + m.ms.executionInfo.WorkflowTaskAttempt
m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = previousTotalFailures
Comment on lines +327 to +328
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update WorkflowTaskAttemptsSinceLastSuccess every time there is a WFT failure instead of only doing it when flushing buffered events? That seems more intuitive behavior to keep this attribute up-to-date.

}
m.ms.executionInfo.WorkflowTaskAttempt = 1
workflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL
createWorkflowTaskScheduledEvent = true
Expand Down Expand Up @@ -924,19 +932,22 @@ func (m *workflowTaskStateMachine) failWorkflowTask(
m.ms.ClearStickyTaskQueue()
}

// AttemptsSinceLastSuccess is the carried-over attempt count from the previous workflow task.
// It stays the same across failures unless buffered events reset the Attempt counter.
failWorkflowTaskInfo := &historyi.WorkflowTaskInfo{
Version: common.EmptyVersion,
ScheduledEventID: common.EmptyEventID,
StartedEventID: common.EmptyEventID,
RequestID: emptyUUID,
WorkflowTaskTimeout: time.Duration(0),
StartedTime: time.Unix(0, 0).UTC(),
TaskQueue: nil,
OriginalScheduledTime: time.Unix(0, 0).UTC(),
Attempt: 1,
Type: enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED,
SuggestContinueAsNew: false,
HistorySizeBytes: 0,
Version: common.EmptyVersion,
ScheduledEventID: common.EmptyEventID,
StartedEventID: common.EmptyEventID,
RequestID: emptyUUID,
WorkflowTaskTimeout: time.Duration(0),
StartedTime: time.Unix(0, 0).UTC(),
TaskQueue: nil,
OriginalScheduledTime: time.Unix(0, 0).UTC(),
Attempt: 1,
AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess,
Type: enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED,
SuggestContinueAsNew: false,
HistorySizeBytes: 0,
// need to retain Build ID of failed WF task to compare it with the build ID of next attempt
BuildId: m.ms.executionInfo.WorkflowTaskBuildId,
}
Expand All @@ -950,8 +961,14 @@ func (m *workflowTaskStateMachine) failWorkflowTask(
m.retainWorkflowTaskBuildIdInfo(failWorkflowTaskInfo)
m.UpdateWorkflowTask(failWorkflowTaskInfo)

// Total continuous failures = AttemptsSinceLastSuccess + Attempt
// AttemptsSinceLastSuccess contains the sum of all previous workflow task failures.
// Attempt is the current workflow task's attempt number (>= 1).
// When this function is called, we're recording a failure, so Attempt represents at least one attempt.
totalContinuousFailures := failWorkflowTaskInfo.AttemptsSinceLastSuccess + failWorkflowTaskInfo.Attempt
consecutiveFailuresRequired := m.ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(m.ms.GetNamespaceEntry().Name().String())
if consecutiveFailuresRequired > 0 && failWorkflowTaskInfo.Attempt >= int32(consecutiveFailuresRequired) {

if consecutiveFailuresRequired > 0 && totalContinuousFailures >= int32(consecutiveFailuresRequired) {
if err := m.ms.UpdateReportedProblemsSearchAttribute(); err != nil {
return err
}
Expand Down Expand Up @@ -1016,6 +1033,7 @@ func (m *workflowTaskStateMachine) UpdateWorkflowTask(
m.ms.executionInfo.WorkflowTaskRequestId = workflowTask.RequestID
m.ms.executionInfo.WorkflowTaskTimeout = durationpb.New(workflowTask.WorkflowTaskTimeout)
m.ms.executionInfo.WorkflowTaskAttempt = workflowTask.Attempt
m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess = workflowTask.AttemptsSinceLastSuccess
if !workflowTask.StartedTime.IsZero() {
m.ms.executionInfo.WorkflowTaskStartedTime = timestamppb.New(workflowTask.StartedTime)
}
Expand Down Expand Up @@ -1141,6 +1159,7 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *historyi.WorkflowTaskI
RequestID: m.ms.executionInfo.WorkflowTaskRequestId,
WorkflowTaskTimeout: m.ms.executionInfo.WorkflowTaskTimeout.AsDuration(),
Attempt: m.ms.executionInfo.WorkflowTaskAttempt,
AttemptsSinceLastSuccess: m.ms.executionInfo.WorkflowTaskAttemptsSinceLastSuccess,
StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime.AsTime(),
ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime.AsTime(),
TaskQueue: m.ms.CurrentTaskQueue(),
Expand Down
106 changes: 103 additions & 3 deletions tests/workflow_task_reported_problems_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type WFTFailureReportedProblemsTestSuite struct {
shouldFail atomic.Bool
failureCount atomic.Int32
failureType atomic.Int32 // 0 = panic, 1 = non-deterministic error
stopSignals atomic.Bool
}

func TestWFTFailureReportedProblemsTestSuite(t *testing.T) {
Expand All @@ -31,7 +32,7 @@ func TestWFTFailureReportedProblemsTestSuite(t *testing.T) {

func (s *WFTFailureReportedProblemsTestSuite) SetupTest() {
s.FunctionalTestBase.SetupTest()
s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute, 2)
s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute, 3)
}

func (s *WFTFailureReportedProblemsTestSuite) simpleWorkflowWithShouldFail(ctx workflow.Context) (string, error) {
Expand All @@ -45,6 +46,24 @@ func (s *WFTFailureReportedProblemsTestSuite) simpleActivity() (string, error) {
return "done!", nil
}

// workflowWithSignalsThatFails creates a workflow that listens for signals and fails on each workflow task.
// This is used to test that the TemporalReportedProblems search attribute is not incorrectly removed
// when signals keep coming in despite continuous workflow task failures.
func (s *WFTFailureReportedProblemsTestSuite) workflowWithSignalsThatFails(ctx workflow.Context) (string, error) {
signalChan := workflow.GetSignalChannel(ctx, "test-signal")

var signalValue string
signalChan.Receive(ctx, &signalValue)
Comment on lines +53 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really need to handle the signals in the workflow, it doesn't add value in the test and prevents your workflow task from failing because you never reach the line where it panics.


// Always fail after receiving a signal, unless shouldFail is false
if s.shouldFail.Load() {
panic("forced-panic-after-signal")
}

// If we reach here, shouldFail is false, so we can complete
return "done!", nil
}

// workflowWithActivity creates a workflow that executes an activity before potentially failing.
// This is used to test workflow task failure scenarios in a more realistic context where the workflow
// has already executed some operations (activities) before encountering a workflow task failure.
Expand Down Expand Up @@ -95,7 +114,7 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Set
execution, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID())
require.NoError(t, err)
require.GreaterOrEqual(t, execution.PendingWorkflowTask.Attempt, int32(2))
}, 5*time.Second, 500*time.Millisecond)
}, 20*time.Second, 500*time.Millisecond)

// Unblock the workflow
s.shouldFail.Store(false)
Expand All @@ -110,6 +129,87 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Set
s.False(ok)
}

func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_NotClearedBySignals() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

s.shouldFail.Store(true)
s.stopSignals.Store(false)

s.Worker().RegisterWorkflow(s.workflowWithSignalsThatFails)

workflowOptions := sdkclient.StartWorkflowOptions{
ID: testcore.RandomizeStr("wf_id-" + s.T().Name()),
TaskQueue: s.TaskQueue(),
}

workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, s.workflowWithSignalsThatFails)
s.NoError(err)

// Start sending signals every second in a goroutine
signalDone := make(chan struct{})
go func() {
defer close(signalDone)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if s.stopSignals.Load() {
return
}
_ = s.SdkClient().SignalWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test-signal", "ping")
case <-ctx.Done():
return
}
}
}()

// Wait for the search attribute to be set due to consecutive failures
s.EventuallyWithT(func(t *assert.CollectT) {
description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID())
require.NoError(t, err)
saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems))
require.True(t, ok)
require.NotEmpty(t, saVal)
require.Contains(t, saVal, "category=WorkflowTaskFailed")
require.Contains(t, saVal, "cause=WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure")
}, 30*time.Second, 500*time.Millisecond)

// Continue sending signals for a few more seconds and verify the search attribute is NOT removed
// This is the key part of the test - signals should not cause the search attribute to be cleared

s.EventuallyWithT(func(t *assert.CollectT) {
description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID())
s.NoError(err)
saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems))
s.True(ok, "Search attribute should still be present after receiving signals")
s.NotEmpty(saVal, "Search attribute should not be empty after receiving signals")
}, 5*time.Second, 500*time.Millisecond)

// Stop signals and unblock the workflow for cleanup
s.stopSignals.Store(true)
s.shouldFail.Store(false)

// Send one final signal to trigger workflow completion
err = s.SdkClient().SignalWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test-signal", "final")
s.NoError(err)

var out string
s.NoError(workflowRun.Get(ctx, &out))

// Wait for signal goroutine to finish
<-signalDone

// Verify search attribute is cleared after successful completion
description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID())
s.NoError(err)
s.NotNil(description.TypedSearchAttributes)
_, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems))
s.False(ok)
}

func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_SetAndClear_FailAfterActivity() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down Expand Up @@ -137,7 +237,7 @@ func (s *WFTFailureReportedProblemsTestSuite) TestWFTFailureReportedProblems_Set
require.Len(t, saValues, 2)
require.Contains(t, saValues, "category=WorkflowTaskFailed")
require.Contains(t, saValues, "cause=WorkflowTaskFailedCauseWorkflowWorkerUnhandledFailure")
}, 5*time.Second, 500*time.Millisecond)
}, 20*time.Second, 500*time.Millisecond)

// Unblock the workflow
s.shouldFail.Store(false)
Expand Down
Loading
Loading