diff --git a/.github/workflows/docker/dynamic-config-custom.yaml b/.github/workflows/docker/dynamic-config-custom.yaml index 7b3f250e7..227934f50 100644 --- a/.github/workflows/docker/dynamic-config-custom.yaml +++ b/.github/workflows/docker/dynamic-config-custom.yaml @@ -1,5 +1,5 @@ system.forceSearchAttributesCacheRefreshOnRead: - value: true # Dev setup only. Please don't turn this on in production. constraints: {} -system.enableActivityLocalDispatch: +system.enableActivityEagerExecution: - value: true \ No newline at end of file diff --git a/internal/internal_eager_activity.go b/internal/internal_eager_activity.go index 97a8ca23f..79b0a5857 100644 --- a/internal/internal_eager_activity.go +++ b/internal/internal_eager_activity.go @@ -56,18 +56,30 @@ func newEagerActivityExecutor(options eagerActivityExecutorOptions) *eagerActivi func (e *eagerActivityExecutor) applyToRequest( req *workflowservice.RespondWorkflowTaskCompletedRequest, ) (amountActivitySlotsReserved int) { + // Don't allow more than this hardcoded amount per workflow task for now + const maxPerTask = 3 + // Go over every command checking for activities that can be eagerly executed + eagerRequestsThisTask := 0 for _, command := range req.Commands { if attrs := command.GetScheduleActivityTaskCommandAttributes(); attrs != nil { - // If not present, disabled, or on a different task queue, we must mark as + // If not present, disabled, not requested, no activity worker, on a + // different task queue, or reached max for task, we must mark as // explicitly disabled - if e == nil || e.disabled || e.activityWorker == nil || e.taskQueue != attrs.TaskQueue.GetName() { + eagerDisallowed := e == nil || + e.disabled || + !attrs.RequestEagerExecution || + e.activityWorker == nil || + e.taskQueue != attrs.TaskQueue.GetName() || + eagerRequestsThisTask >= maxPerTask + if eagerDisallowed { attrs.RequestEagerExecution = false - } else if attrs.RequestEagerExecution { + } else { // If it has been requested, attempt to reserve one pending attrs.RequestEagerExecution = e.reserveOnePendingSlot() if attrs.RequestEagerExecution { amountActivitySlotsReserved++ + eagerRequestsThisTask++ } } } diff --git a/internal/internal_eager_activity_test.go b/internal/internal_eager_activity_test.go index 80b293173..aad9720c9 100644 --- a/internal/internal_eager_activity_test.go +++ b/internal/internal_eager_activity_test.go @@ -75,6 +75,26 @@ func TestEagerActivityWrongTaskQueue(t *testing.T) { require.False(t, req.Commands[1].GetScheduleActivityTaskCommandAttributes().RequestEagerExecution) } +func TestEagerActivityMaxPerTask(t *testing.T) { + exec := newEagerActivityExecutor(eagerActivityExecutorOptions{taskQueue: "task-queue1"}) + exec.activityWorker = newActivityWorker(nil, + workerExecutionParameters{TaskQueue: "task-queue1", ConcurrentActivityExecutionSize: 10}, nil, newRegistry(), nil) + // Fill up the poller request channel + for i := 0; i < 10; i++ { + exec.activityWorker.worker.pollerRequestCh <- struct{}{} + } + + // Add 8, but it limits to only the first 3 + var req workflowservice.RespondWorkflowTaskCompletedRequest + for i := 0; i < 8; i++ { + addScheduleTaskCommand(&req, "task-queue1") + } + require.Equal(t, 3, exec.applyToRequest(&req)) + for i := 0; i < 8; i++ { + require.Equal(t, i < 3, req.Commands[i].GetScheduleActivityTaskCommandAttributes().RequestEagerExecution) + } +} + func TestEagerActivityCounts(t *testing.T) { // We'll create an eager activity executor with 3 max eager concurrent and 5 // max concurrent diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 3f0b9be91..a97c58fde 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -65,9 +65,8 @@ const ( // center. And the poll API latency is about 5ms. With 2 poller, we could achieve around 300~400 RPS. defaultConcurrentPollRoutineSize = 2 - defaultMaxConcurrentActivityExecutionSize = 1000 // Large concurrent activity execution size (1k) - defaultMaxConcurrentEagerActivityExecutionSize = 3 - defaultWorkerActivitiesPerSecond = 100000 // Large activity executions/sec (unlimited) + defaultMaxConcurrentActivityExecutionSize = 1000 // Large concurrent activity execution size (1k) + defaultWorkerActivitiesPerSecond = 100000 // Large activity executions/sec (unlimited) defaultMaxConcurrentLocalActivityExecutionSize = 1000 // Large concurrent activity execution size (1k) defaultWorkerLocalActivitiesPerSecond = 100000 // Large activity executions/sec (unlimited) @@ -1603,9 +1602,6 @@ func setWorkerOptionsDefaults(options *WorkerOptions) { if options.MaxHeartbeatThrottleInterval == 0 { options.MaxHeartbeatThrottleInterval = defaultMaxHeartbeatThrottleInterval } - if options.MaxConcurrentEagerActivityExecutionSize == 0 { - options.MaxConcurrentEagerActivityExecutionSize = defaultMaxConcurrentEagerActivityExecutionSize - } } // setClientDefaults should be needed only in unit tests. diff --git a/internal/worker.go b/internal/worker.go index 8f6e25219..dc11c390c 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -209,8 +209,7 @@ type ( // workflow task schedules 3 more, only the first 2 will request eager // execution. // - // If unset/0, this defaults to 3. Users wanting effectively unlimited can - // set this to a high value. This is still bounded by + // The default of 0 means unlimited and therefore only bound by // MaxConcurrentActivityExecutionSize. // // See DisableEagerActivities for a description of eager activity execution.