Skip to content

Commit

Permalink
feat(orchestration): add limites on parallel activities
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Jul 17, 2024
1 parent edbf248 commit bb87c5c
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 8 deletions.
1 change: 1 addition & 0 deletions ee/orchestration/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
temporalSSLClientCertFlag = "temporal-ssl-client-cert"
temporalTaskQueueFlag = "temporal-task-queue"
temporalInitSearchAttributes = "temporal-init-search-attributes"
temporalMaxActivitiesPerSecond = "temporal-max-activities-per-second"
topicsFlag = "topics"
listenFlag = "listen"
workerFlag = "worker"
Expand Down
1 change: 1 addition & 0 deletions ee/orchestration/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func newServeCommand() *cobra.Command {

cmd.Flags().Bool(workerFlag, false, "Enable worker mode")
cmd.Flags().String(listenFlag, ":8080", "Listening address")
cmd.Flags().Float64(temporalMaxActivitiesPerSecond, 10, "Maximum number of parallel activities")
service.BindFlags(cmd)

return cmd
Expand Down
5 changes: 4 additions & 1 deletion ee/orchestration/cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"go.temporal.io/sdk/worker"
"net/http"

"github.com/formancehq/orchestration/internal/triggers"
Expand All @@ -27,7 +28,9 @@ func stackClientModule() fx.Option {
func workerOptions() fx.Option {
return fx.Options(
stackClientModule(),
temporalworker.NewWorkerModule(viper.GetString(temporalTaskQueueFlag)),
temporalworker.NewWorkerModule(viper.GetString(temporalTaskQueueFlag), worker.Options{
TaskQueueActivitiesPerSecond: viper.GetFloat64(temporalMaxActivitiesPerSecond),
}),
triggers.NewListenerModule(
viper.GetString(stackFlag),
viper.GetString(temporalTaskQueueFlag),
Expand Down
2 changes: 2 additions & 0 deletions ee/orchestration/internal/api/v1/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"context"
"go.temporal.io/sdk/worker"
"log"
"net/http"
"os"
Expand Down Expand Up @@ -53,6 +54,7 @@ func test(t *testing.T, fn func(router *chi.Mux, backend api.Backend, db *bun.DB
[]temporalworker.DefinitionSet{
workflow.NewActivities(publish.NoOpPublisher, db).DefinitionSet(),
},
worker.Options{},
)

require.NoError(t, worker.Start())
Expand Down
4 changes: 3 additions & 1 deletion ee/orchestration/internal/api/v2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2

import (
"context"
"go.temporal.io/sdk/worker"
"log"
"net/http"
"os"
Expand All @@ -11,7 +12,7 @@ import (
"github.com/formancehq/orchestration/internal/workflow/stages"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/publish"
chi "github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"go.temporal.io/sdk/testsuite"

Expand Down Expand Up @@ -53,6 +54,7 @@ func test(t *testing.T, fn func(router *chi.Mux, backend api.Backend, db *bun.DB
[]temporalworker.DefinitionSet{
workflow.NewActivities(publish.NoOpPublisher, db).DefinitionSet(),
},
worker.Options{},
)
require.NoError(t, worker.Start())
t.Cleanup(worker.Stop)
Expand Down
11 changes: 5 additions & 6 deletions ee/orchestration/internal/temporalworker/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ func (d DefinitionSet) Append(definition Definition) DefinitionSet {
return d
}

func New(logger logging.Logger, c client.Client, taskQueue string, workflows, activities []DefinitionSet) worker.Worker {
worker := worker.New(c, taskQueue, worker.Options{
BackgroundActivityContext: logging.ContextWithLogger(context.Background(), logger),
})
func New(logger logging.Logger, c client.Client, taskQueue string, workflows, activities []DefinitionSet, options worker.Options) worker.Worker {
options.BackgroundActivityContext = logging.ContextWithLogger(context.Background(), logger)
worker := worker.New(c, taskQueue, options)

for _, set := range workflows {
for _, workflow := range set {
Expand All @@ -54,11 +53,11 @@ func New(logger logging.Logger, c client.Client, taskQueue string, workflows, ac
return worker
}

func NewWorkerModule(taskQueue string) fx.Option {
func NewWorkerModule(taskQueue string, options worker.Options) fx.Option {
return fx.Options(
fx.Provide(
fx.Annotate(func(logger logging.Logger, c client.Client, workflows, activities []DefinitionSet) worker.Worker {
return New(logger, c, taskQueue, workflows, activities)
return New(logger, c, taskQueue, workflows, activities, options)
}, fx.ParamTags(``, ``, `group:"workflows"`, `group:"activities"`)),
),
fx.Invoke(func(lc fx.Lifecycle, w worker.Worker) {
Expand Down
2 changes: 2 additions & 0 deletions ee/orchestration/internal/triggers/workflow_trigger_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package triggers

import (
worker "go.temporal.io/sdk/worker"
"testing"
"time"

Expand Down Expand Up @@ -47,6 +48,7 @@ func TestWorkflow(t *testing.T) {
workflow.NewActivities(publish.NoOpPublisher, db).DefinitionSet(),
NewActivities(db, workflowManager, NewDefaultExpressionEvaluator(), publish.NoOpPublisher).DefinitionSet(),
},
worker.Options{},
)
require.NoError(t, worker.Start())
t.Cleanup(worker.Stop)
Expand Down
2 changes: 2 additions & 0 deletions ee/orchestration/internal/workflow/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package workflow

import (
worker "go.temporal.io/sdk/worker"
"testing"
"time"

Expand Down Expand Up @@ -42,6 +43,7 @@ func TestConfig(t *testing.T) {
[]temporalworker.DefinitionSet{
NewActivities(publish.NoOpPublisher, db).DefinitionSet(),
},
worker.Options{},
)
require.NoError(t, worker.Start())
t.Cleanup(worker.Stop)
Expand Down

0 comments on commit bb87c5c

Please sign in to comment.