Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 54 additions & 25 deletions tests/max_buffered_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/tests/testcore"
)
Expand Down Expand Up @@ -69,7 +71,7 @@ func (s *MaxBufferedEventSuite) TestMaxBufferedEventsLimit() {

s.Worker().RegisterWorkflow(workflowFn)

testCtx, cancel := context.WithTimeout(context.Background(), time.Second*20)
testCtx, cancel := context.WithTimeout(s.T().Context(), time.Second*20)
defer cancel()

wid := "test-max-buffered-events-limit"
Expand Down Expand Up @@ -132,16 +134,21 @@ func (s *MaxBufferedEventSuite) TestBufferedEventsMutableStateSizeLimit() {
closeStartChanOnce.Do(func() {
close(waitStartChan)
})

s.Logger.Info("waiting for channel close",
// not the actual workflow ID
tag.WorkflowID("TestBufferedEventsMutableStateSizeLimit"))
// block workflow task so all signals will be buffered.
<-waitSignalChan
s.Logger.Info("channel closed, activity completed",
// not the actual workflow ID
tag.WorkflowID("TestBufferedEventsMutableStateSizeLimit"))
return nil
}

workflowFn := func(ctx workflow.Context) (int, error) {
ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 20 * time.Second,
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 5},
})
f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn)
if err := f1.Get(ctx, nil); err != nil {
Expand All @@ -151,15 +158,21 @@ func (s *MaxBufferedEventSuite) TestBufferedEventsMutableStateSizeLimit() {
sigCh := workflow.GetSignalChannel(ctx, "test-signal")

sigCount := 0
s.Logger.Info("reading signals",
// not the actual workflow ID
tag.WorkflowID("TestBufferedEventsMutableStateSizeLimit"))
for sigCh.ReceiveAsync(nil) {
sigCount++
}
s.Logger.Info("workflow completed",
// not the actual workflow ID
tag.WorkflowID("TestBufferedEventsMutableStateSizeLimit"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanos is there a better way to tag this?

Copy link
Contributor

@stephanos stephanos Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tag is good 👍 Why not make it debug; since we know we have debug logs for tests enabled. This way production stays clean.

return sigCount, nil
}

s.Worker().RegisterWorkflow(workflowFn)

testCtx, cancel := context.WithTimeout(context.Background(), time.Second*20)
testCtx, cancel := context.WithTimeout(s.T().Context(), time.Second*20)
defer cancel()

wid := "test-max-buffered-events-limit"
Expand All @@ -169,7 +182,7 @@ func (s *MaxBufferedEventSuite) TestBufferedEventsMutableStateSizeLimit() {
WorkflowTaskTimeout: time.Second * 20,
}, workflowFn)

s.NoError(err1)
require.NoError(s.T(), err1)

// block until workflow task started
<-waitStartChan
Expand All @@ -179,35 +192,51 @@ func (s *MaxBufferedEventSuite) TestBufferedEventsMutableStateSizeLimit() {
// fill the slice with random data to make sure the
// encoder does not zero out the data
_, err := rand.Read(buf)
s.NoError(err)
require.NoError(s.T(), err)
largePayload := payloads.EncodeBytes(buf)
for i := 0; i < 3; i++ {
err := s.SdkClient().SignalWorkflow(testCtx, wid, "", "test-signal", largePayload)
s.NoError(err)
require.NoError(s.T(), err)
}

// send 4th signal, this will fail the started workflow task and force terminate the workflow
err = s.SdkClient().SignalWorkflow(testCtx, wid, "", "test-signal", largePayload)
s.NoError(err)
require.NoError(s.T(), err)

// unblock goroutine that runs local activity
close(waitSignalChan)

var sigCount int
err = wf1.Get(testCtx, &sigCount)
s.NoError(err)
s.Equal(4, sigCount)

historyEvents := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{WorkflowId: wf1.GetID()})
// Not using historyrequire here because history is not deterministic.
var failedCause enumspb.WorkflowTaskFailedCause
var failedCount int
for _, evt := range historyEvents {
if evt.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED {
failedCause = evt.GetWorkflowTaskFailedEventAttributes().Cause
failedCount++
s.Logger.Info("waiting for channel close",
// not the actual workflow ID
tag.WorkflowID("TestBufferedEventsMutableStateSizeLimit"))
require.Eventually(s.T(), func() bool {
historyEvents := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{WorkflowId: wf1.GetID()})
// Not using historyrequire here because history is not deterministic.
var failedCause enumspb.WorkflowTaskFailedCause
var failedCount int
for _, evt := range historyEvents {
if evt.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED {
failedCause = evt.GetWorkflowTaskFailedEventAttributes().Cause
failedCount++
}
}
}
s.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND, failedCause)
s.Equal(1, failedCount)
if failedCause != enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND {
return false
}
if failedCount != 1 {
return false
}
require.Equal(s.T(), enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND, failedCause)
require.Equal(s.T(), 1, failedCount)
return true
}, time.Second*10, time.Millisecond*500)
require.Eventually(s.T(), func() bool {
ctx, cancel := context.WithTimeout(s.T().Context(), time.Millisecond*500)
defer cancel()
var sigCount int
if err := wf1.Get(ctx, &sigCount); err != nil {
return false
}
require.Equal(s.T(), 4, sigCount)
return true
}, time.Second*10, time.Millisecond*500)
}
Loading