Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions chasm/lib/scheduler/backfiller_tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,9 @@ func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
// Either type of request will spawn a Backfiller and schedule an immediate pure task.
_, err = s.node.CloseTransaction()
s.NoError(err)
s.GreaterOrEqual(1, len(s.addedTasks))
task, ok := s.addedTasks[0].(*tasks.ChasmTaskPure)
s.True(ok)
s.Equal(chasm.TaskScheduledTimeImmediate, task.GetVisibilityTime())
s.True(s.hasTask(&tasks.ChasmTaskPure{}, chasm.TaskScheduledTimeImmediate))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but I don't think transfer queue supports pure task. That combination doesn't really make sense I think and the pure task logic can be executed in the same transaction instead.


// Run a backfill task.
s.addedTasks = make([]tasks.Task, 0) // Clear old tasks.
err = s.executor.Execute(ctx, backfiller, chasm.TaskAttributes{}, &schedulerpb.BackfillerTask{})
s.NoError(err)
_, err = s.node.CloseTransaction() // TODO - remove this when CHASM has unit testing hooks for task generation
Expand All @@ -260,7 +256,11 @@ func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
s.NoError(err)
s.Nil(res)
} else {
s.GreaterOrEqual(1, len(s.addedTasks))
// TODO - check that a pure task to continue driving backfill exists here. Because
// a pure task in the tree already has the physically-created status, closing the
// transaction won't call our backend mock for AddTasks twice. Fix this when CHASM
// offers unit testing hooks for task generation.

s.Equal(int64(c.ExpectedAttempt), backfiller.GetAttempt())
s.Equal(c.ExpectedLastProcessedTime.UTC(), backfiller.GetLastProcessedTime().AsTime())
}
Expand Down
5 changes: 1 addition & 4 deletions chasm/lib/scheduler/generator_tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,5 @@ func (s *generatorTasksSuite) TestExecuteBufferTask_Basic() {
// Ensure we scheduled an immediate physical pure task on the tree.
_, err = s.node.CloseTransaction()
s.NoError(err)
s.Equal(1, len(s.addedTasks))
task, ok := s.addedTasks[0].(*tasks.ChasmTaskPure)
s.True(ok)
s.Equal(chasm.TaskScheduledTimeImmediate, task.GetVisibilityTime())
s.True(s.hasTask(&tasks.ChasmTaskPure{}, chasm.TaskScheduledTimeImmediate))
}
4 changes: 0 additions & 4 deletions chasm/lib/scheduler/invoker_execute_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/testing/mockapi/workflowservicemock/v1"
"go.temporal.io/server/service/history/tasks"
"go.uber.org/mock/gomock"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -340,9 +339,6 @@ func (s *invokerExecuteTaskSuite) runExecuteTestCase(c *executeTestCase) {
s.ExpectReadComponent(invoker)
s.ExpectUpdateComponent(invoker)

// Clear old tasks and run the execute task.
s.addedTasks = make([]tasks.Task, 0)

// Create engine context for side effect task execution
engineCtx := s.newEngineContext()
err = s.executor.Execute(engineCtx, chasm.ComponentRef{}, chasm.TaskAttributes{}, &schedulerpb.InvokerExecuteTask{})
Expand Down
4 changes: 0 additions & 4 deletions chasm/lib/scheduler/invoker_process_buffer_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/tasks"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -313,9 +312,6 @@ func (s *invokerProcessBufferTaskSuite) runProcessBufferTestCase(c *processBuffe
// Set LastProcessedTime to current time to ensure time checks pass
invoker.LastProcessedTime = timestamppb.New(s.timeSource.Now())

// Clear old tasks and run the process buffer task
s.addedTasks = make([]tasks.Task, 0)

err = s.executor.Execute(ctx, invoker, chasm.TaskAttributes{}, &schedulerpb.InvokerProcessBufferTask{})
s.NoError(err)
_, err = s.node.CloseTransaction()
Expand Down
15 changes: 15 additions & 0 deletions chasm/lib/scheduler/scheduler_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduler_test

import (
"context"
"reflect"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -86,6 +87,20 @@ func (s *schedulerSuite) SetupSuite() {
s.node.SetRootComponent(s.scheduler)
}

// hasTask returns true if the given task type was added at the end of the
// transaction with the given visibilityTime.
func (s *schedulerSuite) hasTask(task any, visibilityTime time.Time) bool {
taskType := reflect.TypeOf(task)
for _, task := range s.addedTasks {
if reflect.TypeOf(task) == taskType &&
task.GetVisibilityTime().Equal(visibilityTime) {
return true
}
}

return false
}

func (s *schedulerSuite) newMutableContext() chasm.MutableContext {
return chasm.NewMutableContext(context.Background(), s.node)
}
Expand Down
Loading