diff --git a/service/interpreter/stateExecutionCounter.go b/service/interpreter/stateExecutionCounter.go index 8a3bc5b1..3b728ff0 100644 --- a/service/interpreter/stateExecutionCounter.go +++ b/service/interpreter/stateExecutionCounter.go @@ -20,7 +20,10 @@ type StateExecutionCounter struct { totalCurrentlyExecutingCount int // For "dead ends": count the total pending states } -func NewStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider, configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter) *StateExecutionCounter { +func NewStateExecutionCounter( + ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, + configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, +) *StateExecutionCounter { return &StateExecutionCounter{ ctx: ctx, provider: provider, @@ -28,13 +31,15 @@ func NewStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider, con stateIdCurrentlyExecutingCounts: make(map[string]int), totalCurrentlyExecutingCount: 0, configer: configer, - globalVersioner: NewGlobalVersioner(provider, ctx), + globalVersioner: globalVersioner, continueAsNewCounter: continueAsNewCounter, } } -func RebuildStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider, - stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int, totalCurrentlyExecutingCount int, +func RebuildStateExecutionCounter( + ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, + stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int, + totalCurrentlyExecutingCount int, configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, ) *StateExecutionCounter { return &StateExecutionCounter{ @@ -44,7 +49,7 @@ func RebuildStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider, stateIdCurrentlyExecutingCounts: stateIdCurrentlyExecutingCounts, totalCurrentlyExecutingCount: totalCurrentlyExecutingCount, configer: configer, - globalVersioner: NewGlobalVersioner(provider, ctx), + globalVersioner: globalVersioner, continueAsNewCounter: continueAsNewCounter, } } diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 910dd458..0b5589d5 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -70,7 +70,7 @@ func InterpreterImpl( continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, interStateChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, previous.SignalsReceived) counterInfo := previous.StateExecutionCounterInfo - stateExecutionCounter = RebuildStateExecutionCounter(ctx, provider, + stateExecutionCounter = RebuildStateExecutionCounter(ctx, provider, globalVersioner, counterInfo.StateIdStartedCount, counterInfo.StateIdCurrentlyExecutingCount, counterInfo.TotalCurrentlyExecutingCount, workflowConfiger, continueAsNewCounter) outputCollector = NewOutputCollector(previous.StateOutputs) @@ -82,7 +82,7 @@ func InterpreterImpl( timerProcessor = NewTimerProcessor(ctx, provider, nil) continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) signalReceiver = NewSignalReceiver(ctx, provider, interStateChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil) - stateExecutionCounter = NewStateExecutionCounter(ctx, provider, workflowConfiger, continueAsNewCounter) + stateExecutionCounter = NewStateExecutionCounter(ctx, provider, globalVersioner, workflowConfiger, continueAsNewCounter) outputCollector = NewOutputCollector(nil) continueAsNewer = NewContinueAsNewer(provider, interStateChannel, signalReceiver, stateExecutionCounter, persistenceManager, stateRequestQueue, outputCollector, timerProcessor) } @@ -177,7 +177,7 @@ func InterpreterImpl( shouldSendSignalOnCompletion := slices.Contains(input.WaitForCompletionStateExecutionIds, stateExeId) decision, stateExecStatus, err := executeState( - ctx, provider, basicInfo, stateReq, stateExeId, persistenceManager, interStateChannel, + ctx, provider, globalVersioner, basicInfo, stateReq, stateExeId, persistenceManager, interStateChannel, signalReceiver, timerProcessor, continueAsNewer, continueAsNewCounter, shouldSendSignalOnCompletion) if err != nil { // this is the case where stateExecStatus == FailureStateExecutionStatus @@ -424,6 +424,7 @@ func checkClosingWorkflow( func executeState( ctx UnifiedContext, provider WorkflowProvider, + globalVersioner *GlobalVersioner, basicInfo service.BasicInfo, stateReq StateRequest, stateExeId string, @@ -435,7 +436,6 @@ func executeState( continueAsNewCounter *ContinueAsNewCounter, shouldSendSignalOnCompletion bool, ) (*iwfidl.StateDecision, service.StateExecutionStatus, error) { - globalVersioner := NewGlobalVersioner(provider, ctx) waitUntilApi := StateStart executeApi := StateDecide if globalVersioner.IsAfterVersionOfRenamedStateApi() {