diff --git a/tests/max_buffered_event_test.go b/tests/max_buffered_event_test.go index 818b022baa6..c42ccdb46cd 100644 --- a/tests/max_buffered_event_test.go +++ b/tests/max_buffered_event_test.go @@ -7,12 +7,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" + "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/payloads" "go.temporal.io/server/tests/testcore" ) @@ -69,7 +71,7 @@ func (s *MaxBufferedEventSuite) TestMaxBufferedEventsLimit() { s.Worker().RegisterWorkflow(workflowFn) - testCtx, cancel := context.WithTimeout(context.Background(), time.Second*20) + testCtx, cancel := context.WithTimeout(s.T().Context(), time.Second*20) defer cancel() wid := "test-max-buffered-events-limit" @@ -132,16 +134,21 @@ func (s *MaxBufferedEventSuite) TestBufferedEventsMutableStateSizeLimit() { closeStartChanOnce.Do(func() { close(waitStartChan) }) - + s.Logger.Info("waiting for channel close", + // not the actual workflow ID + tag.WorkflowID("TestBufferedEventsMutableStateSizeLimit")) // block workflow task so all signals will be buffered. <-waitSignalChan + s.Logger.Info("channel closed, activity completed", + // not the actual workflow ID + tag.WorkflowID("TestBufferedEventsMutableStateSizeLimit")) return nil } workflowFn := func(ctx workflow.Context) (int, error) { ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ StartToCloseTimeout: 20 * time.Second, - RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, + RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 5}, }) f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn) if err := f1.Get(ctx, nil); err != nil { @@ -151,15 +158,21 @@ func (s *MaxBufferedEventSuite) TestBufferedEventsMutableStateSizeLimit() { sigCh := workflow.GetSignalChannel(ctx, "test-signal") sigCount := 0 + s.Logger.Info("reading signals", + // not the actual workflow ID + tag.WorkflowID("TestBufferedEventsMutableStateSizeLimit")) for sigCh.ReceiveAsync(nil) { sigCount++ } + s.Logger.Info("workflow completed", + // not the actual workflow ID + tag.WorkflowID("TestBufferedEventsMutableStateSizeLimit")) return sigCount, nil } s.Worker().RegisterWorkflow(workflowFn) - testCtx, cancel := context.WithTimeout(context.Background(), time.Second*20) + testCtx, cancel := context.WithTimeout(s.T().Context(), time.Second*20) defer cancel() wid := "test-max-buffered-events-limit" @@ -169,7 +182,7 @@ func (s *MaxBufferedEventSuite) TestBufferedEventsMutableStateSizeLimit() { WorkflowTaskTimeout: time.Second * 20, }, workflowFn) - s.NoError(err1) + require.NoError(s.T(), err1) // block until workflow task started <-waitStartChan @@ -179,35 +192,51 @@ func (s *MaxBufferedEventSuite) TestBufferedEventsMutableStateSizeLimit() { // fill the slice with random data to make sure the // encoder does not zero out the data _, err := rand.Read(buf) - s.NoError(err) + require.NoError(s.T(), err) largePayload := payloads.EncodeBytes(buf) for i := 0; i < 3; i++ { err := s.SdkClient().SignalWorkflow(testCtx, wid, "", "test-signal", largePayload) - s.NoError(err) + require.NoError(s.T(), err) } // send 4th signal, this will fail the started workflow task and force terminate the workflow err = s.SdkClient().SignalWorkflow(testCtx, wid, "", "test-signal", largePayload) - s.NoError(err) + require.NoError(s.T(), err) // unblock goroutine that runs local activity close(waitSignalChan) - - var sigCount int - err = wf1.Get(testCtx, &sigCount) - s.NoError(err) - s.Equal(4, sigCount) - - historyEvents := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{WorkflowId: wf1.GetID()}) - // Not using historyrequire here because history is not deterministic. - var failedCause enumspb.WorkflowTaskFailedCause - var failedCount int - for _, evt := range historyEvents { - if evt.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED { - failedCause = evt.GetWorkflowTaskFailedEventAttributes().Cause - failedCount++ + s.Logger.Info("waiting for channel close", + // not the actual workflow ID + tag.WorkflowID("TestBufferedEventsMutableStateSizeLimit")) + require.Eventually(s.T(), func() bool { + historyEvents := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{WorkflowId: wf1.GetID()}) + // Not using historyrequire here because history is not deterministic. + var failedCause enumspb.WorkflowTaskFailedCause + var failedCount int + for _, evt := range historyEvents { + if evt.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED { + failedCause = evt.GetWorkflowTaskFailedEventAttributes().Cause + failedCount++ + } } - } - s.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND, failedCause) - s.Equal(1, failedCount) + if failedCause != enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND { + return false + } + if failedCount != 1 { + return false + } + require.Equal(s.T(), enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND, failedCause) + require.Equal(s.T(), 1, failedCount) + return true + }, time.Second*10, time.Millisecond*500) + require.Eventually(s.T(), func() bool { + ctx, cancel := context.WithTimeout(s.T().Context(), time.Millisecond*500) + defer cancel() + var sigCount int + if err := wf1.Get(ctx, &sigCount); err != nil { + return false + } + require.Equal(s.T(), 4, sigCount) + return true + }, time.Second*10, time.Millisecond*500) }