diff --git a/gen/iwfidl/api/openapi.yaml b/gen/iwfidl/api/openapi.yaml index c18e8b48..27845c37 100644 --- a/gen/iwfidl/api/openapi.yaml +++ b/gen/iwfidl/api/openapi.yaml @@ -738,6 +738,7 @@ components: workflowConfigOverride: disableSystemSearchAttribute: true continueAsNewThreshold: 1 + optimizeTimer: true continueAsNewPageSizeInBytes: 4 executingStateIdMode: null optimizeActivity: true @@ -1011,6 +1012,7 @@ components: workflowConfigOverride: disableSystemSearchAttribute: true continueAsNewThreshold: 1 + optimizeTimer: true continueAsNewPageSizeInBytes: 4 executingStateIdMode: null optimizeActivity: true @@ -1601,6 +1603,7 @@ components: workflowConfig: disableSystemSearchAttribute: true continueAsNewThreshold: 1 + optimizeTimer: true continueAsNewPageSizeInBytes: 4 executingStateIdMode: null optimizeActivity: true @@ -3284,6 +3287,7 @@ components: example: disableSystemSearchAttribute: true continueAsNewThreshold: 1 + optimizeTimer: true continueAsNewPageSizeInBytes: 4 executingStateIdMode: null optimizeActivity: true @@ -3298,6 +3302,8 @@ components: type: integer optimizeActivity: type: boolean + optimizeTimer: + type: boolean type: object Context: example: diff --git a/gen/iwfidl/docs/WorkflowConfig.md b/gen/iwfidl/docs/WorkflowConfig.md index b7c2d6ff..ac490cc1 100644 --- a/gen/iwfidl/docs/WorkflowConfig.md +++ b/gen/iwfidl/docs/WorkflowConfig.md @@ -9,6 +9,7 @@ Name | Type | Description | Notes **ContinueAsNewThreshold** | Pointer to **int32** | | [optional] **ContinueAsNewPageSizeInBytes** | Pointer to **int32** | | [optional] **OptimizeActivity** | Pointer to **bool** | | [optional] +**OptimizeTimer** | Pointer to **bool** | | [optional] ## Methods @@ -154,6 +155,31 @@ SetOptimizeActivity sets OptimizeActivity field to given value. HasOptimizeActivity returns a boolean if a field has been set. +### GetOptimizeTimer + +`func (o *WorkflowConfig) GetOptimizeTimer() bool` + +GetOptimizeTimer returns the OptimizeTimer field if non-nil, zero value otherwise. + +### GetOptimizeTimerOk + +`func (o *WorkflowConfig) GetOptimizeTimerOk() (*bool, bool)` + +GetOptimizeTimerOk returns a tuple with the OptimizeTimer field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetOptimizeTimer + +`func (o *WorkflowConfig) SetOptimizeTimer(v bool)` + +SetOptimizeTimer sets OptimizeTimer field to given value. + +### HasOptimizeTimer + +`func (o *WorkflowConfig) HasOptimizeTimer() bool` + +HasOptimizeTimer returns a boolean if a field has been set. + [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/gen/iwfidl/model_workflow_config.go b/gen/iwfidl/model_workflow_config.go index 72374728..f55069cf 100644 --- a/gen/iwfidl/model_workflow_config.go +++ b/gen/iwfidl/model_workflow_config.go @@ -24,6 +24,7 @@ type WorkflowConfig struct { ContinueAsNewThreshold *int32 `json:"continueAsNewThreshold,omitempty"` ContinueAsNewPageSizeInBytes *int32 `json:"continueAsNewPageSizeInBytes,omitempty"` OptimizeActivity *bool `json:"optimizeActivity,omitempty"` + OptimizeTimer *bool `json:"optimizeTimer,omitempty"` } // NewWorkflowConfig instantiates a new WorkflowConfig object @@ -203,6 +204,38 @@ func (o *WorkflowConfig) SetOptimizeActivity(v bool) { o.OptimizeActivity = &v } +// GetOptimizeTimer returns the OptimizeTimer field value if set, zero value otherwise. +func (o *WorkflowConfig) GetOptimizeTimer() bool { + if o == nil || IsNil(o.OptimizeTimer) { + var ret bool + return ret + } + return *o.OptimizeTimer +} + +// GetOptimizeTimerOk returns a tuple with the OptimizeTimer field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *WorkflowConfig) GetOptimizeTimerOk() (*bool, bool) { + if o == nil || IsNil(o.OptimizeTimer) { + return nil, false + } + return o.OptimizeTimer, true +} + +// HasOptimizeTimer returns a boolean if a field has been set. +func (o *WorkflowConfig) HasOptimizeTimer() bool { + if o != nil && !IsNil(o.OptimizeTimer) { + return true + } + + return false +} + +// SetOptimizeTimer gets a reference to the given bool and assigns it to the OptimizeTimer field. +func (o *WorkflowConfig) SetOptimizeTimer(v bool) { + o.OptimizeTimer = &v +} + func (o WorkflowConfig) MarshalJSON() ([]byte, error) { toSerialize, err := o.ToMap() if err != nil { @@ -228,6 +261,9 @@ func (o WorkflowConfig) ToMap() (map[string]interface{}, error) { if !IsNil(o.OptimizeActivity) { toSerialize["optimizeActivity"] = o.OptimizeActivity } + if !IsNil(o.OptimizeTimer) { + toSerialize["optimizeTimer"] = o.OptimizeTimer + } return toSerialize, nil } diff --git a/integ/any_timer_signal_test.go b/integ/any_timer_signal_test.go index ee2c2810..24c2fce9 100644 --- a/integ/any_timer_signal_test.go +++ b/integ/any_timer_signal_test.go @@ -12,6 +12,7 @@ import ( "time" ) +// TODO: crate greedy tests for cancelling timer early func TestAnyTimerSignalWorkflowTemporal(t *testing.T) { if !*temporalIntegTest { t.Skip() @@ -52,6 +53,8 @@ func TestAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) { } } +// TODO: crate greedy tests for cancelling timer early + func doTestAnyTimerSignalWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { // start test workflow server wfHandler := anytimersignal.NewHandler() diff --git a/integ/timer_test.go b/integ/timer_test.go index 35681e1e..bb1521c4 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" ) +// TODO: create greedy tests by copying these 4 tests and pass in OptimizeTimer: true func TestTimerWorkflowTemporal(t *testing.T) { if !*temporalIntegTest { t.Skip() @@ -55,6 +56,8 @@ func TestTimerWorkflowCadenceContinueAsNew(t *testing.T) { } } +// TODO: create greedy tests by copying these 4 tests and pass in OptimizeTimer: true + func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { // start test workflow server wfHandler := timer.NewHandler() diff --git a/iwf-idl b/iwf-idl index a7fb5559..b29e553d 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit a7fb55597de5591fffa406b6b5f07d85fda4dfee +Subproject commit b29e553da2efc5d01a1e496398c2894920644bad diff --git a/service/api/service.go b/service/api/service.go index fdf53e7a..2be54a4f 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -6,6 +6,7 @@ import ( "github.com/indeedeng/iwf/config" "github.com/indeedeng/iwf/service/common/event" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "net/http" "os" "strings" @@ -13,15 +14,13 @@ import ( uclient "github.com/indeedeng/iwf/service/client" "github.com/indeedeng/iwf/service/common/compatibility" - "github.com/indeedeng/iwf/service/common/rpc" - "github.com/indeedeng/iwf/service/common/utils" - "github.com/indeedeng/iwf/service/interpreter" - "github.com/indeedeng/iwf/service/common/errors" "github.com/indeedeng/iwf/service/common/log" "github.com/indeedeng/iwf/service/common/log/tag" "github.com/indeedeng/iwf/service/common/mapper" "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/common/rpc" + "github.com/indeedeng/iwf/service/common/utils" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" @@ -801,7 +800,7 @@ func (s *serviceImpl) handleRpcBySynchronousUpdate( ctx context.Context, req iwfidl.WorkflowRpcRequest, ) (resp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) { req.TimeoutSeconds = ptr.Any(utils.TrimRpcTimeoutSeconds(ctx, req)) - var output interpreter.HandlerOutput + var output interfaces.HandlerOutput err := s.client.SynchronousUpdateWorkflow(ctx, &output, req.GetWorkflowId(), req.GetWorkflowRunId(), service.ExecuteOptimisticLockingRpcUpdateType, req) if err != nil { errType := s.client.GetApplicationErrorTypeIfIsApplicationError(err) diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index da950002..3106b8bc 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -12,6 +12,7 @@ import ( "github.com/indeedeng/iwf/service/common/rpc" "github.com/indeedeng/iwf/service/common/urlautofix" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "io" "net/http" "os" @@ -30,7 +31,7 @@ func StateApiWaitUntil( ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput, ) (*iwfidl.WorkflowStateStartResponse, error) { stateApiWaitUntilStartTime := time.Now().UnixMilli() - provider := getActivityProviderByType(backendType) + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("StateWaitUntilActivity", "input", log.ToJsonAndTruncateForLogging(input)) iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl) @@ -115,7 +116,7 @@ func StateApiExecute( input service.StateDecideActivityInput, ) (*iwfidl.WorkflowStateDecideResponse, error) { stateApiExecuteStartTime := time.Now().UnixMilli() - provider := getActivityProviderByType(backendType) + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("StateExecuteActivity", "input", log.ToJsonAndTruncateForLogging(input)) @@ -197,20 +198,20 @@ func checkStateDecisionFromResponse(resp *iwfidl.WorkflowStateDecideResponse) er return nil } -func printDebugMsg(logger UnifiedLogger, err error, url string) { +func printDebugMsg(logger interfaces.UnifiedLogger, err error, url string) { debugMode := os.Getenv(service.EnvNameDebugMode) if debugMode != "" { logger.Info("check error at http request", err, url) } } -func composeStartApiRespError(provider ActivityProvider, err error, resp *iwfidl.WorkflowStateStartResponse) error { +func composeStartApiRespError(provider interfaces.ActivityProvider, err error, resp *iwfidl.WorkflowStateStartResponse) error { respStr, _ := resp.MarshalJSON() return provider.NewApplicationError(string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE), fmt.Sprintf("err msg: %v, response: %v", err, string(respStr))) } -func composeExecuteApiRespError(provider ActivityProvider, err error, resp *iwfidl.WorkflowStateDecideResponse) error { +func composeExecuteApiRespError(provider interfaces.ActivityProvider, err error, resp *iwfidl.WorkflowStateDecideResponse) error { respStr, _ := resp.MarshalJSON() return provider.NewApplicationError(string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE), fmt.Sprintf("err msg: %v, response: %v", err, string(respStr))) @@ -224,7 +225,7 @@ func checkHttpError(err error, httpResp *http.Response) bool { } func composeHttpError( - isLocalActivity bool, provider ActivityProvider, err error, httpResp *http.Response, errType string, + isLocalActivity bool, provider interfaces.ActivityProvider, err error, httpResp *http.Response, errType string, ) error { responseBody := "None" var statusCode int @@ -329,7 +330,7 @@ func listTimerSignalInternalChannelCommandIds(commandReq *iwfidl.CommandRequest) func DumpWorkflowInternal( ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest, ) (*iwfidl.WorkflowDumpResponse, error) { - provider := getActivityProviderByType(backendType) + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("DumpWorkflowInternalActivity", "input", log.ToJsonAndTruncateForLogging(req)) @@ -357,15 +358,15 @@ func DumpWorkflowInternal( func InvokeWorkerRpc( ctx context.Context, backendType service.BackendType, rpcPrep *service.PrepareRpcQueryResponse, req iwfidl.WorkflowRpcRequest, -) (*InvokeRpcActivityOutput, error) { - provider := getActivityProviderByType(backendType) +) (*interfaces.InvokeRpcActivityOutput, error) { + provider := interfaces.GetActivityProviderByType(backendType) logger := provider.GetLogger(ctx) logger.Info("InvokeWorkerRpcActivity", "input", log.ToJsonAndTruncateForLogging(req)) apiMaxSeconds := env.GetSharedConfig().Api.MaxWaitSeconds resp, statusErr := rpc.InvokeWorkerRpc(ctx, rpcPrep, req, apiMaxSeconds) - return &InvokeRpcActivityOutput{ + return &interfaces.InvokeRpcActivityOutput{ RpcOutput: resp, StatusError: statusErr, }, nil diff --git a/service/interpreter/activityImpl_test.go b/service/interpreter/activityImpl_test.go index 4a4e2885..428085ba 100644 --- a/service/interpreter/activityImpl_test.go +++ b/service/interpreter/activityImpl_test.go @@ -6,6 +6,7 @@ import ( "github.com/golang/mock/gomock" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "github.com/stretchr/testify/assert" "io" "net/http" @@ -195,9 +196,9 @@ func TestComposeHttpError_RegularActivity_NilResponse(t *testing.T) { assert.Equal(t, returnedError, err) } -func createTestComposeHttpErrorInitialState(t *testing.T, httpError string, initialError string) (*MockActivityProvider, *http.Response, error) { +func createTestComposeHttpErrorInitialState(t *testing.T, httpError string, initialError string) (*interfaces.MockActivityProvider, *http.Response, error) { ctrl := gomock.NewController(t) - mockActivityProvider := NewMockActivityProvider(ctrl) + mockActivityProvider := interfaces.NewMockActivityProvider(ctrl) var httpResp *http.Response = nil if httpError != "" { diff --git a/service/interpreter/cadence/activityProvider.go b/service/interpreter/cadence/activityProvider.go index b8a4496c..6fb7fb3b 100644 --- a/service/interpreter/cadence/activityProvider.go +++ b/service/interpreter/cadence/activityProvider.go @@ -3,7 +3,7 @@ package cadence import ( "context" "github.com/indeedeng/iwf/service" - "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.uber.org/cadence" "go.uber.org/cadence/activity" ) @@ -11,27 +11,27 @@ import ( type activityProvider struct{} func init() { - interpreter.RegisterActivityProvider(service.BackendTypeCadence, &activityProvider{}) + interfaces.RegisterActivityProvider(service.BackendTypeCadence, &activityProvider{}) } func (a *activityProvider) NewApplicationError(errType string, details interface{}) error { return cadence.NewCustomError(errType, details) } -func (a *activityProvider) GetLogger(ctx context.Context) interpreter.UnifiedLogger { +func (a *activityProvider) GetLogger(ctx context.Context) interfaces.UnifiedLogger { zLogger := activity.GetLogger(ctx) return &loggerImpl{ zlogger: zLogger, } } -func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.ActivityInfo { +func (a *activityProvider) GetActivityInfo(ctx context.Context) interfaces.ActivityInfo { info := activity.GetInfo(ctx) - return interpreter.ActivityInfo{ + return interfaces.ActivityInfo{ ScheduledTime: info.ScheduledTimestamp, Attempt: info.Attempt + 1, // NOTE increase by one to match Temporal IsLocalActivity: false, // TODO cadence doesn't support this yet - WorkflowExecution: interpreter.WorkflowExecution{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, diff --git a/service/interpreter/cadence/workflow.go b/service/interpreter/cadence/workflow.go index 107d76e0..9c9512e1 100644 --- a/service/interpreter/cadence/workflow.go +++ b/service/interpreter/cadence/workflow.go @@ -3,13 +3,14 @@ package cadence import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.uber.org/cadence/workflow" ) func Interpreter(ctx workflow.Context, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) { - return interpreter.InterpreterImpl(interpreter.NewUnifiedContext(ctx), newCadenceWorkflowProvider(), input) + return interpreter.InterpreterImpl(interfaces.NewUnifiedContext(ctx), newCadenceWorkflowProvider(), input) } func WaitforStateCompletionWorkflow(ctx workflow.Context) (*service.WaitForStateCompletionWorkflowOutput, error) { - return interpreter.WaitForStateCompletionWorkflowImpl(interpreter.NewUnifiedContext(ctx), newCadenceWorkflowProvider()) + return interpreter.WaitForStateCompletionWorkflowImpl(interfaces.NewUnifiedContext(ctx), newCadenceWorkflowProvider()) } diff --git a/service/interpreter/cadence/workflowProvider.go b/service/interpreter/cadence/workflowProvider.go index 996f4e89..925b4f8d 100644 --- a/service/interpreter/cadence/workflowProvider.go +++ b/service/interpreter/cadence/workflowProvider.go @@ -3,12 +3,12 @@ package cadence import ( "fmt" "github.com/indeedeng/iwf/service/common/mapper" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/retry" - "github.com/indeedeng/iwf/service/interpreter" "go.uber.org/cadence" "go.uber.org/cadence/workflow" ) @@ -18,7 +18,7 @@ type workflowProvider struct { pendingThreadNames map[string]int } -func newCadenceWorkflowProvider() interpreter.WorkflowProvider { +func newCadenceWorkflowProvider() interfaces.WorkflowProvider { return &workflowProvider{ pendingThreadNames: map[string]int{}, } @@ -38,7 +38,7 @@ func (w *workflowProvider) IsApplicationError(err error) bool { } func (w *workflowProvider) NewInterpreterContinueAsNewError( - ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput, + ctx interfaces.UnifiedContext, input service.InterpreterWorkflowInput, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -48,7 +48,7 @@ func (w *workflowProvider) NewInterpreterContinueAsNewError( } func (w *workflowProvider) UpsertSearchAttributes( - ctx interpreter.UnifiedContext, attributes map[string]interface{}, + ctx interfaces.UnifiedContext, attributes map[string]interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -57,11 +57,11 @@ func (w *workflowProvider) UpsertSearchAttributes( return workflow.UpsertSearchAttributes(wfCtx, attributes) } -func (w *workflowProvider) UpsertMemo(ctx interpreter.UnifiedContext, memo map[string]iwfidl.EncodedObject) error { +func (w *workflowProvider) UpsertMemo(ctx interfaces.UnifiedContext, memo map[string]iwfidl.EncodedObject) error { return fmt.Errorf("upsert memo is not supported in Cadence") } -func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Duration) interpreter.Future { +func (w *workflowProvider) NewTimer(ctx interfaces.UnifiedContext, d time.Duration) interfaces.Future { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -72,14 +72,14 @@ func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Durat } } -func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) interpreter.WorkflowInfo { +func (w *workflowProvider) GetWorkflowInfo(ctx interfaces.UnifiedContext) interfaces.WorkflowInfo { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") } info := workflow.GetInfo(wfCtx) - return interpreter.WorkflowInfo{ - WorkflowExecution: interpreter.WorkflowExecution{ + return interfaces.WorkflowInfo{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, @@ -91,7 +91,7 @@ func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) inter } func (w *workflowProvider) GetSearchAttributes( - ctx interpreter.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, + ctx interfaces.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, ) (map[string]iwfidl.SearchAttribute, error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -103,7 +103,7 @@ func (w *workflowProvider) GetSearchAttributes( } func (w *workflowProvider) SetQueryHandler( - ctx interpreter.UnifiedContext, queryType string, handler interface{}, + ctx interfaces.UnifiedContext, queryType string, handler interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -113,32 +113,32 @@ func (w *workflowProvider) SetQueryHandler( } func (w *workflowProvider) SetRpcUpdateHandler( - ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, - handler interpreter.UnifiedRpcHandler, + ctx interfaces.UnifiedContext, updateType string, validator interfaces.UnifiedRpcValidator, + handler interfaces.UnifiedRpcHandler, ) error { // NOTE: this feature is not available in Cadence return nil } func (w *workflowProvider) ExtendContextWithValue( - parent interpreter.UnifiedContext, key string, val interface{}, -) interpreter.UnifiedContext { + parent interfaces.UnifiedContext, key string, val interface{}, +) interfaces.UnifiedContext { wfCtx, ok := parent.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") } - return interpreter.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) + return interfaces.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) } func (w *workflowProvider) GoNamed( - ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext), + ctx interfaces.UnifiedContext, name string, f func(ctx interfaces.UnifiedContext), ) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") } f2 := func(ctx workflow.Context) { - ctx2 := interpreter.NewUnifiedContext(ctx) + ctx2 := interfaces.NewUnifiedContext(ctx) w.pendingThreadNames[name]++ w.threadCount++ f(ctx2) @@ -159,7 +159,7 @@ func (w *workflowProvider) GetThreadCount() int { return w.threadCount } -func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func() bool) error { +func (w *workflowProvider) Await(ctx interfaces.UnifiedContext, condition func() bool) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -168,8 +168,8 @@ func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func( } func (w *workflowProvider) WithActivityOptions( - ctx interpreter.UnifiedContext, options interpreter.ActivityOptions, -) interpreter.UnifiedContext { + ctx interfaces.UnifiedContext, options interfaces.ActivityOptions, +) interfaces.UnifiedContext { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -190,7 +190,7 @@ func (w *workflowProvider) WithActivityOptions( ScheduleToCloseTimeout: time.Second * 7, RetryPolicy: retry.ConvertCadenceActivityRetryPolicy(options.RetryPolicy), }) - return interpreter.NewUnifiedContext(wfCtx3) + return interfaces.NewUnifiedContext(wfCtx3) } type futureImpl struct { @@ -201,7 +201,7 @@ func (t *futureImpl) IsReady() bool { return t.future.IsReady() } -func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error { +func (t *futureImpl) Get(ctx interfaces.UnifiedContext, valuePtr interface{}) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -212,7 +212,7 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e func (w *workflowProvider) ExecuteActivity( valuePtr interface{}, optimizeByLocalActivity bool, - ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}, + ctx interfaces.UnifiedContext, activity interface{}, args ...interface{}, ) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -232,7 +232,7 @@ func (w *workflowProvider) ExecuteActivity( return f.Get(wfCtx, valuePtr) } -func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { +func (w *workflowProvider) Now(ctx interfaces.UnifiedContext) time.Time { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -240,7 +240,7 @@ func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { return workflow.Now(wfCtx) } -func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { +func (w *workflowProvider) IsReplaying(ctx interfaces.UnifiedContext) bool { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -248,7 +248,7 @@ func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { return workflow.IsReplaying(wfCtx) } -func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration) (err error) { +func (w *workflowProvider) Sleep(ctx interfaces.UnifiedContext, d time.Duration) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -257,7 +257,7 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration } func (w *workflowProvider) GetVersion( - ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int, + ctx interfaces.UnifiedContext, changeID string, minSupported, maxSupported int, ) int { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -276,7 +276,7 @@ func (t *cadenceReceiveChannel) ReceiveAsync(valuePtr interface{}) (ok bool) { return t.channel.ReceiveAsync(valuePtr) } -func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, valuePtr interface{}) (ok bool) { +func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interfaces.UnifiedContext, valuePtr interface{}) (ok bool) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -286,8 +286,8 @@ func (t *cadenceReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, } func (w *workflowProvider) GetSignalChannel( - ctx interpreter.UnifiedContext, signalName string, -) interpreter.ReceiveChannel { + ctx interfaces.UnifiedContext, signalName string, +) interfaces.ReceiveChannel { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -298,7 +298,7 @@ func (w *workflowProvider) GetSignalChannel( } } -func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key string) interface{} { +func (w *workflowProvider) GetContextValue(ctx interfaces.UnifiedContext, key string) interface{} { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -306,7 +306,7 @@ func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key s return wfCtx.Value(key) } -func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter.UnifiedLogger { +func (w *workflowProvider) GetLogger(ctx interfaces.UnifiedContext) interfaces.UnifiedLogger { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") @@ -318,7 +318,7 @@ func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter } } -func (w *workflowProvider) GetUnhandledSignalNames(ctx interpreter.UnifiedContext) []string { +func (w *workflowProvider) GetUnhandledSignalNames(ctx interfaces.UnifiedContext) []string { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to cadence workflow context") diff --git a/service/interpreter/workflowConfiger.go b/service/interpreter/config/workflowConfiger.go similarity index 98% rename from service/interpreter/workflowConfiger.go rename to service/interpreter/config/workflowConfiger.go index 81fedc02..dfa53506 100644 --- a/service/interpreter/workflowConfiger.go +++ b/service/interpreter/config/workflowConfiger.go @@ -1,4 +1,4 @@ -package interpreter +package config import ( "github.com/indeedeng/iwf/gen/iwfidl" diff --git a/service/interpreter/continueAsNewCounter.go b/service/interpreter/cont/continueAsNewCounter.go similarity index 75% rename from service/interpreter/continueAsNewCounter.go rename to service/interpreter/cont/continueAsNewCounter.go index 279c7515..c98bb20d 100644 --- a/service/interpreter/continueAsNewCounter.go +++ b/service/interpreter/cont/continueAsNewCounter.go @@ -1,4 +1,9 @@ -package interpreter +package cont + +import ( + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/interfaces" +) type ContinueAsNewCounter struct { executedStateApis int32 @@ -6,13 +11,13 @@ type ContinueAsNewCounter struct { syncUpdateReceived int32 triggeredByAPI bool - configer *WorkflowConfiger - rootCtx UnifiedContext - provider WorkflowProvider + configer *config.WorkflowConfiger + rootCtx interfaces.UnifiedContext + provider interfaces.WorkflowProvider } func NewContinueAsCounter( - configer *WorkflowConfiger, rootCtx UnifiedContext, provider WorkflowProvider, + configer *config.WorkflowConfiger, rootCtx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, ) *ContinueAsNewCounter { return &ContinueAsNewCounter{ configer: configer, diff --git a/service/interpreter/continueAsNewer.go b/service/interpreter/continueAsNewer.go index 4ec787cb..3264af6e 100644 --- a/service/interpreter/continueAsNewer.go +++ b/service/interpreter/continueAsNewer.go @@ -7,13 +7,14 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "math" "strings" "time" ) type ContinueAsNewer struct { - provider WorkflowProvider + provider interfaces.WorkflowProvider StateExecutionToResumeMap map[string]service.StateExecutionResumeInfo // stateExeId to StateExecutionResumeInfo inflightUpdateOperations int @@ -24,14 +25,14 @@ type ContinueAsNewer struct { persistenceManager *PersistenceManager signalReceiver *SignalReceiver outputCollector *OutputCollector - timerProcessor *TimerProcessor + timerProcessor interfaces.TimerProcessor } func NewContinueAsNewer( - provider WorkflowProvider, + provider interfaces.WorkflowProvider, interStateChannel *InternalChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector, - timerProcessor *TimerProcessor, + timerProcessor interfaces.TimerProcessor, ) *ContinueAsNewer { return &ContinueAsNewer{ provider: provider, @@ -49,9 +50,9 @@ func NewContinueAsNewer( } func LoadInternalsFromPreviousRun( - ctx UnifiedContext, provider WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32, ) (*service.ContinueAsNewDumpResponse, error) { - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 5 * time.Second, RetryPolicy: &iwfidl.RetryPolicy{ MaximumIntervalSeconds: iwfidl.PtrInt32(5), @@ -134,7 +135,7 @@ func (c *ContinueAsNewer) GetSnapshot() service.ContinueAsNewDumpResponse { } } -func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx UnifiedContext) error { +func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx interfaces.UnifiedContext) error { return c.provider.SetQueryHandler(ctx, service.ContinueAsNewDumpByPageQueryType, // return the current page of the whole snapshot func(request iwfidl.WorkflowDumpRequest) (*iwfidl.WorkflowDumpResponse, error) { @@ -192,7 +193,7 @@ func (c *ContinueAsNewer) RemoveStateExecutionToResume(stateExecutionId string) delete(c.StateExecutionToResumeMap, stateExecutionId) } -func (c *ContinueAsNewer) DrainThreads(ctx UnifiedContext) error { +func (c *ContinueAsNewer) DrainThreads(ctx interfaces.UnifiedContext) error { // TODO: add metric for before and after Await to monitor stuck // NOTE: consider using AwaitWithTimeout to get an alert when workflow stuck due to a bug in the draining logic for continueAsNew @@ -222,7 +223,7 @@ var inMemoryContinueAsNewMonitor = make(map[string]time.Time) const warnThreshold = time.Second * 5 const errThreshold = time.Second * 15 -func (c *ContinueAsNewer) allThreadsDrained(ctx UnifiedContext) bool { +func (c *ContinueAsNewer) allThreadsDrained(ctx interfaces.UnifiedContext) bool { runId := c.provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID remainingThreadCount := c.provider.GetThreadCount() diff --git a/service/interpreter/globalVersioner.go b/service/interpreter/globalVersioner.go index 68d623e2..765b1da9 100644 --- a/service/interpreter/globalVersioner.go +++ b/service/interpreter/globalVersioner.go @@ -2,6 +2,7 @@ package interpreter import ( "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/interpreter/interfaces" ) const globalChangeId = "global" @@ -18,13 +19,13 @@ const MaxOfAllVersions = StartingVersionYieldOnConditionalComplete // GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api type GlobalVersioner struct { - workflowProvider WorkflowProvider - ctx UnifiedContext + workflowProvider interfaces.WorkflowProvider + ctx interfaces.UnifiedContext version int } func NewGlobalVersioner( - workflowProvider WorkflowProvider, ctx UnifiedContext, + workflowProvider interfaces.WorkflowProvider, ctx interfaces.UnifiedContext, ) (*GlobalVersioner, error) { version := workflowProvider.GetVersion(ctx, globalChangeId, 0, MaxOfAllVersions) diff --git a/service/interpreter/interfaces.go b/service/interpreter/interfaces/interfaces.go similarity index 88% rename from service/interpreter/interfaces.go rename to service/interpreter/interfaces/interfaces.go index 7a6f733f..b74975ce 100644 --- a/service/interpreter/interfaces.go +++ b/service/interpreter/interfaces/interfaces.go @@ -1,4 +1,4 @@ -package interpreter +package interfaces import ( "context" @@ -31,7 +31,7 @@ func RegisterActivityProvider(backendType service.BackendType, provider Activity activityProviderRegistry[backendType] = provider } -func getActivityProviderByType(backendType service.BackendType) ActivityProvider { +func GetActivityProviderByType(backendType service.BackendType) ActivityProvider { provider := activityProviderRegistry[backendType] if provider == nil { panic("not supported yet: " + backendType) @@ -84,6 +84,16 @@ func NewUnifiedContext(ctx interface{}) UnifiedContext { } } +type TimerProcessor interface { + Dump() []service.StaleSkipTimerSignal + GetCurrentTimerInfos() map[string][]*service.TimerInfo + SkipTimer(stateExeId string, timerId string, timerIdx int) bool + RetryStaleSkipTimer() bool + WaitForTimerFiredOrSkipped(ctx UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool) service.InternalTimerStatus + RemovePendingTimersOfState(stateExeId string) + AddTimers(stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus) +} + type WorkflowProvider interface { NewApplicationError(errType string, details interface{}) error IsApplicationError(err error) bool diff --git a/service/interpreter/interfaces_mock.go b/service/interpreter/interfaces/interfaces_mock.go similarity index 99% rename from service/interpreter/interfaces_mock.go rename to service/interpreter/interfaces/interfaces_mock.go index 3b2799d0..55058d08 100644 --- a/service/interpreter/interfaces_mock.go +++ b/service/interpreter/interfaces/interfaces_mock.go @@ -2,7 +2,7 @@ // Source: /Users/lwolczynski/indeedeng/iwf-server/service/interpreter/interfaces.go // Package interpreter is a generated GoMock package. -package interpreter +package interfaces import ( context "context" diff --git a/service/interpreter/persistence.go b/service/interpreter/persistence.go index c9c34a30..45e1d284 100644 --- a/service/interpreter/persistence.go +++ b/service/interpreter/persistence.go @@ -5,12 +5,13 @@ import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/mapper" "github.com/indeedeng/iwf/service/common/utils" + "github.com/indeedeng/iwf/service/interpreter/interfaces" ) type PersistenceManager struct { dataObjects map[string]iwfidl.KeyValue searchAttributes map[string]iwfidl.SearchAttribute - provider WorkflowProvider + provider interfaces.WorkflowProvider lockedDataObjectKeys map[string]bool lockedSearchAttributeKeys map[string]bool @@ -19,7 +20,7 @@ type PersistenceManager struct { } func NewPersistenceManager( - provider WorkflowProvider, initDataAttributes []iwfidl.KeyValue, initSearchAttributes []iwfidl.SearchAttribute, + provider interfaces.WorkflowProvider, initDataAttributes []iwfidl.KeyValue, initSearchAttributes []iwfidl.SearchAttribute, useMemo bool, ) *PersistenceManager { searchAttributes := make(map[string]iwfidl.SearchAttribute) @@ -43,7 +44,7 @@ func NewPersistenceManager( } func RebuildPersistenceManager( - provider WorkflowProvider, + provider interfaces.WorkflowProvider, dolist []iwfidl.KeyValue, salist []iwfidl.SearchAttribute, useMemo bool, ) *PersistenceManager { @@ -89,7 +90,7 @@ func (am *PersistenceManager) GetDataObjectsByKey(request service.GetDataAttribu } func (am *PersistenceManager) LoadSearchAttributes( - ctx UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, + ctx interfaces.UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, ) []iwfidl.SearchAttribute { var loadingType iwfidl.PersistenceLoadingType var partialLoadingKeys []string @@ -127,7 +128,7 @@ func (am *PersistenceManager) LoadSearchAttributes( } func (am *PersistenceManager) LoadDataObjects( - ctx UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, + ctx interfaces.UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy, ) []iwfidl.KeyValue { var loadingType iwfidl.PersistenceLoadingType var partialLoadingKeys []string @@ -181,7 +182,7 @@ func (am *PersistenceManager) GetAllDataObjects() []iwfidl.KeyValue { } func (am *PersistenceManager) ProcessUpsertSearchAttribute( - ctx UnifiedContext, attributes []iwfidl.SearchAttribute, + ctx interfaces.UnifiedContext, attributes []iwfidl.SearchAttribute, ) error { if len(attributes) == 0 { return nil @@ -197,7 +198,7 @@ func (am *PersistenceManager) ProcessUpsertSearchAttribute( return am.provider.UpsertSearchAttributes(ctx, attrsToUpsert) } -func (am *PersistenceManager) ProcessUpsertDataObject(ctx UnifiedContext, attributes []iwfidl.KeyValue) error { +func (am *PersistenceManager) ProcessUpsertDataObject(ctx interfaces.UnifiedContext, attributes []iwfidl.KeyValue) error { if len(attributes) == 0 { return nil } @@ -228,7 +229,7 @@ func (am *PersistenceManager) checkKeysAreUnlocked(lockedKeys map[string]bool, k return true } -func (am *PersistenceManager) awaitAndLockForKeys(ctx UnifiedContext, lockedKeys map[string]bool, keysToLock []string) { +func (am *PersistenceManager) awaitAndLockForKeys(ctx interfaces.UnifiedContext, lockedKeys map[string]bool, keysToLock []string) { // wait until all keys are not locked err := am.provider.Await(ctx, func() bool { for _, k := range keysToLock { diff --git a/service/interpreter/queryHandler.go b/service/interpreter/queryHandler.go index 7b2d7660..b67ae942 100644 --- a/service/interpreter/queryHandler.go +++ b/service/interpreter/queryHandler.go @@ -3,13 +3,15 @@ package interpreter import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/interfaces" ) func SetQueryHandlers( - ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager, internalChannel *InternalChannel, signalReceiver *SignalReceiver, continueAsNewer *ContinueAsNewer, - workflowConfiger *WorkflowConfiger, basicInfo service.BasicInfo, + workflowConfiger *config.WorkflowConfiger, basicInfo service.BasicInfo, ) error { err := provider.SetQueryHandler(ctx, service.GetDataAttributesWorkflowQueryType, func(req service.GetDataAttributesQueryRequest) (service.GetDataAttributesQueryResponse, error) { dos := persistenceManager.GetDataObjectsByKey(req) diff --git a/service/interpreter/signalReceiver.go b/service/interpreter/signalReceiver.go index 06aa17de..f73356a5 100644 --- a/service/interpreter/signalReceiver.go +++ b/service/interpreter/signalReceiver.go @@ -2,6 +2,9 @@ package interpreter import ( "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "strings" "github.com/indeedeng/iwf/gen/iwfidl" @@ -13,19 +16,19 @@ type SignalReceiver struct { receivedSignals map[string][]*iwfidl.EncodedObject failWorkflowByClient bool reasonFailWorkflowByClient *string - provider WorkflowProvider - timerProcessor *TimerProcessor - workflowConfiger *WorkflowConfiger + provider interfaces.WorkflowProvider + timerProcessor interfaces.TimerProcessor + workflowConfiger *config.WorkflowConfiger interStateChannel *InternalChannel stateRequestQueue *StateRequestQueue persistenceManager *PersistenceManager } func NewSignalReceiver( - ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InternalChannel, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, interStateChannel *InternalChannel, stateRequestQueue *StateRequestQueue, - persistenceManager *PersistenceManager, tp *TimerProcessor, continueAsNewCounter *ContinueAsNewCounter, - workflowConfiger *WorkflowConfiger, + persistenceManager *PersistenceManager, tp interfaces.TimerProcessor, continueAsNewCounter *cont.ContinueAsNewCounter, + workflowConfiger *config.WorkflowConfiger, initReceivedSignals map[string][]*iwfidl.EncodedObject, ) *SignalReceiver { if initReceivedSignals == nil { @@ -42,7 +45,7 @@ func NewSignalReceiver( persistenceManager: persistenceManager, } - provider.GoNamed(ctx, "fail-workflow-system-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "fail-workflow-system-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.FailWorkflowSignalChannelName) @@ -67,7 +70,7 @@ func NewSignalReceiver( } }) - provider.GoNamed(ctx, "skip-timer-system-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "skip-timer-system-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.SkipTimerSignalChannelName) val := service.SkipTimerSignalRequest{} @@ -91,7 +94,7 @@ func NewSignalReceiver( } }) - provider.GoNamed(ctx, "update-config-system-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "update-config-system-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.UpdateConfigSignalChannelName) val := iwfidl.WorkflowConfigUpdateRequest{} @@ -115,7 +118,7 @@ func NewSignalReceiver( } }) - provider.GoNamed(ctx, "trigger-continue-as-new-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "trigger-continue-as-new-handler", func(ctx interfaces.UnifiedContext) { // NOTE: unlike other signal channels, this one doesn't need to drain during continueAsNew // because if there is a continueAsNew, this signal is not needed anymore ch := provider.GetSignalChannel(ctx, service.TriggerContinueAsNewSignalChannelName) @@ -135,7 +138,7 @@ func NewSignalReceiver( return }) - provider.GoNamed(ctx, "execute-rpc-signal-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "execute-rpc-signal-handler", func(ctx interfaces.UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.ExecuteRpcSignalChannelName) var val service.ExecuteRpcSignalRequest @@ -164,7 +167,7 @@ func NewSignalReceiver( } }) - provider.GoNamed(ctx, "user-signal-receiver-handler", func(ctx UnifiedContext) { + provider.GoNamed(ctx, "user-signal-receiver-handler", func(ctx interfaces.UnifiedContext) { for { var toProcess []string err := provider.Await(ctx, func() bool { @@ -200,7 +203,7 @@ func NewSignalReceiver( return sr } -func (sr *SignalReceiver) receiveSignal(ctx UnifiedContext, sigName string) { +func (sr *SignalReceiver) receiveSignal(ctx interfaces.UnifiedContext, sigName string) { ch := sr.provider.GetSignalChannel(ctx, sigName) for { var sigVal iwfidl.EncodedObject @@ -257,7 +260,7 @@ func (sr *SignalReceiver) GetInfos() map[string]iwfidl.ChannelInfo { // This includes both regular user signals and system signals // 2. Conditional close/complete workflow on signal/internal channel: // retrieve all signal/internal channel messages before checking the signal/internal channels -func (sr *SignalReceiver) DrainAllReceivedButUnprocessedSignals(ctx UnifiedContext) { +func (sr *SignalReceiver) DrainAllReceivedButUnprocessedSignals(ctx interfaces.UnifiedContext) { unhandledSigs := sr.provider.GetUnhandledSignalNames(ctx) if len(unhandledSigs) == 0 { return diff --git a/service/interpreter/stateExecutionCounter.go b/service/interpreter/stateExecutionCounter.go index 1b21ea11..58d3d888 100644 --- a/service/interpreter/stateExecutionCounter.go +++ b/service/interpreter/stateExecutionCounter.go @@ -6,16 +6,19 @@ import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/compatibility" "github.com/indeedeng/iwf/service/common/ptr" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "reflect" "slices" ) type StateExecutionCounter struct { - ctx UnifiedContext - provider WorkflowProvider - configer *WorkflowConfiger + ctx interfaces.UnifiedContext + provider interfaces.WorkflowProvider + configer *config.WorkflowConfiger globalVersioner *GlobalVersioner - continueAsNewCounter *ContinueAsNewCounter + continueAsNewCounter *cont.ContinueAsNewCounter stateIdCompletedCounts map[string]int stateIdStartedCounts map[string]int // For creating stateExecutionId: count the stateId for how many times that have been executed @@ -24,8 +27,8 @@ type StateExecutionCounter struct { } func NewStateExecutionCounter( - ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, - configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, + configer *config.WorkflowConfiger, continueAsNewCounter *cont.ContinueAsNewCounter, ) *StateExecutionCounter { return &StateExecutionCounter{ ctx: ctx, @@ -40,10 +43,10 @@ func NewStateExecutionCounter( } func RebuildStateExecutionCounter( - ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int, totalCurrentlyExecutingCount int, - configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter, + configer *config.WorkflowConfiger, continueAsNewCounter *cont.ContinueAsNewCounter, ) *StateExecutionCounter { return &StateExecutionCounter{ ctx: ctx, diff --git a/service/interpreter/temporal/activityProvider.go b/service/interpreter/temporal/activityProvider.go index d8169791..5941427f 100644 --- a/service/interpreter/temporal/activityProvider.go +++ b/service/interpreter/temporal/activityProvider.go @@ -3,7 +3,7 @@ package temporal import ( "context" "github.com/indeedeng/iwf/service" - "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/temporal" ) @@ -11,10 +11,10 @@ import ( type activityProvider struct{} func init() { - interpreter.RegisterActivityProvider(service.BackendTypeTemporal, &activityProvider{}) + interfaces.RegisterActivityProvider(service.BackendTypeTemporal, &activityProvider{}) } -func (a *activityProvider) GetLogger(ctx context.Context) interpreter.UnifiedLogger { +func (a *activityProvider) GetLogger(ctx context.Context) interfaces.UnifiedLogger { return activity.GetLogger(ctx) } @@ -22,13 +22,13 @@ func (a *activityProvider) NewApplicationError(errType string, details interface return temporal.NewApplicationError("", errType, details) } -func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.ActivityInfo { +func (a *activityProvider) GetActivityInfo(ctx context.Context) interfaces.ActivityInfo { info := activity.GetInfo(ctx) - return interpreter.ActivityInfo{ + return interfaces.ActivityInfo{ ScheduledTime: info.ScheduledTime, Attempt: info.Attempt, IsLocalActivity: info.IsLocalActivity, - WorkflowExecution: interpreter.WorkflowExecution{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, diff --git a/service/interpreter/temporal/workflow.go b/service/interpreter/temporal/workflow.go index aa1e792c..59bec2ec 100644 --- a/service/interpreter/temporal/workflow.go +++ b/service/interpreter/temporal/workflow.go @@ -3,6 +3,7 @@ package temporal import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/interpreter" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "go.temporal.io/sdk/workflow" // TODO(cretz): Remove when tagged @@ -10,9 +11,9 @@ import ( ) func Interpreter(ctx workflow.Context, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) { - return interpreter.InterpreterImpl(interpreter.NewUnifiedContext(ctx), newTemporalWorkflowProvider(), input) + return interpreter.InterpreterImpl(interfaces.NewUnifiedContext(ctx), newTemporalWorkflowProvider(), input) } func WaitforStateCompletionWorkflow(ctx workflow.Context) (*service.WaitForStateCompletionWorkflowOutput, error) { - return interpreter.WaitForStateCompletionWorkflowImpl(interpreter.NewUnifiedContext(ctx), newTemporalWorkflowProvider()) + return interpreter.WaitForStateCompletionWorkflowImpl(interfaces.NewUnifiedContext(ctx), newTemporalWorkflowProvider()) } diff --git a/service/interpreter/temporal/workflowProvider.go b/service/interpreter/temporal/workflowProvider.go index a4aeb57a..0a730606 100644 --- a/service/interpreter/temporal/workflowProvider.go +++ b/service/interpreter/temporal/workflowProvider.go @@ -3,12 +3,12 @@ package temporal import ( "errors" "github.com/indeedeng/iwf/service/common/mapper" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/retry" - "github.com/indeedeng/iwf/service/interpreter" "github.com/indeedeng/iwf/service/interpreter/env" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" @@ -19,7 +19,7 @@ type workflowProvider struct { pendingThreadNames map[string]int } -func newTemporalWorkflowProvider() interpreter.WorkflowProvider { +func newTemporalWorkflowProvider() interfaces.WorkflowProvider { return &workflowProvider{ pendingThreadNames: map[string]int{}, } @@ -39,7 +39,7 @@ func (w *workflowProvider) IsApplicationError(err error) bool { } func (w *workflowProvider) NewInterpreterContinueAsNewError( - ctx interpreter.UnifiedContext, input service.InterpreterWorkflowInput, + ctx interfaces.UnifiedContext, input service.InterpreterWorkflowInput, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -49,7 +49,7 @@ func (w *workflowProvider) NewInterpreterContinueAsNewError( } func (w *workflowProvider) UpsertSearchAttributes( - ctx interpreter.UnifiedContext, attributes map[string]interface{}, + ctx interfaces.UnifiedContext, attributes map[string]interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -58,7 +58,7 @@ func (w *workflowProvider) UpsertSearchAttributes( return workflow.UpsertSearchAttributes(wfCtx, attributes) } -func (w *workflowProvider) UpsertMemo(ctx interpreter.UnifiedContext, rawMemo map[string]iwfidl.EncodedObject) error { +func (w *workflowProvider) UpsertMemo(ctx interfaces.UnifiedContext, rawMemo map[string]iwfidl.EncodedObject) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -83,7 +83,7 @@ func (w *workflowProvider) UpsertMemo(ctx interpreter.UnifiedContext, rawMemo ma return workflow.UpsertMemo(wfCtx, memo) } -func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Duration) interpreter.Future { +func (w *workflowProvider) NewTimer(ctx interfaces.UnifiedContext, d time.Duration) interfaces.Future { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -94,14 +94,14 @@ func (w *workflowProvider) NewTimer(ctx interpreter.UnifiedContext, d time.Durat } } -func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) interpreter.WorkflowInfo { +func (w *workflowProvider) GetWorkflowInfo(ctx interfaces.UnifiedContext) interfaces.WorkflowInfo { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } info := workflow.GetInfo(wfCtx) - return interpreter.WorkflowInfo{ - WorkflowExecution: interpreter.WorkflowExecution{ + return interfaces.WorkflowInfo{ + WorkflowExecution: interfaces.WorkflowExecution{ ID: info.WorkflowExecution.ID, RunID: info.WorkflowExecution.RunID, }, @@ -113,7 +113,7 @@ func (w *workflowProvider) GetWorkflowInfo(ctx interpreter.UnifiedContext) inter } func (w *workflowProvider) GetSearchAttributes( - ctx interpreter.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, + ctx interfaces.UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType, ) (map[string]iwfidl.SearchAttribute, error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -125,7 +125,7 @@ func (w *workflowProvider) GetSearchAttributes( } func (w *workflowProvider) SetQueryHandler( - ctx interpreter.UnifiedContext, queryType string, handler interface{}, + ctx interfaces.UnifiedContext, queryType string, handler interface{}, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -135,19 +135,19 @@ func (w *workflowProvider) SetQueryHandler( } func (w *workflowProvider) SetRpcUpdateHandler( - ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, - handler interpreter.UnifiedRpcHandler, + ctx interfaces.UnifiedContext, updateType string, validator interfaces.UnifiedRpcValidator, + handler interfaces.UnifiedRpcHandler, ) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } v2 := func(ctx workflow.Context, input iwfidl.WorkflowRpcRequest) error { - ctx2 := interpreter.NewUnifiedContext(ctx) + ctx2 := interfaces.NewUnifiedContext(ctx) return validator(ctx2, input) } - h2 := func(ctx workflow.Context, input iwfidl.WorkflowRpcRequest) (*interpreter.HandlerOutput, error) { - ctx2 := interpreter.NewUnifiedContext(ctx) + h2 := func(ctx workflow.Context, input iwfidl.WorkflowRpcRequest) (*interfaces.HandlerOutput, error) { + ctx2 := interfaces.NewUnifiedContext(ctx) return handler(ctx2, input) } return workflow.SetUpdateHandlerWithOptions( @@ -159,24 +159,24 @@ func (w *workflowProvider) SetRpcUpdateHandler( } func (w *workflowProvider) ExtendContextWithValue( - parent interpreter.UnifiedContext, key string, val interface{}, -) interpreter.UnifiedContext { + parent interfaces.UnifiedContext, key string, val interface{}, +) interfaces.UnifiedContext { wfCtx, ok := parent.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } - return interpreter.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) + return interfaces.NewUnifiedContext(workflow.WithValue(wfCtx, key, val)) } func (w *workflowProvider) GoNamed( - ctx interpreter.UnifiedContext, name string, f func(ctx interpreter.UnifiedContext), + ctx interfaces.UnifiedContext, name string, f func(ctx interfaces.UnifiedContext), ) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") } f2 := func(ctx workflow.Context) { - ctx2 := interpreter.NewUnifiedContext(ctx) + ctx2 := interfaces.NewUnifiedContext(ctx) w.pendingThreadNames[name]++ w.threadCount++ f(ctx2) @@ -197,7 +197,7 @@ func (w *workflowProvider) GetThreadCount() int { return w.threadCount } -func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func() bool) error { +func (w *workflowProvider) Await(ctx interfaces.UnifiedContext, condition func() bool) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -206,8 +206,8 @@ func (w *workflowProvider) Await(ctx interpreter.UnifiedContext, condition func( } func (w *workflowProvider) WithActivityOptions( - ctx interpreter.UnifiedContext, options interpreter.ActivityOptions, -) interpreter.UnifiedContext { + ctx interfaces.UnifiedContext, options interfaces.ActivityOptions, +) interfaces.UnifiedContext { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -231,7 +231,7 @@ func (w *workflowProvider) WithActivityOptions( ScheduleToCloseTimeout: time.Second * 7, RetryPolicy: retry.ConvertTemporalActivityRetryPolicy(options.RetryPolicy), }) - return interpreter.NewUnifiedContext(wfCtx3) + return interfaces.NewUnifiedContext(wfCtx3) } type futureImpl struct { @@ -242,7 +242,7 @@ func (t *futureImpl) IsReady() bool { return t.future.IsReady() } -func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) error { +func (t *futureImpl) Get(ctx interfaces.UnifiedContext, valuePtr interface{}) error { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -253,7 +253,7 @@ func (t *futureImpl) Get(ctx interpreter.UnifiedContext, valuePtr interface{}) e func (w *workflowProvider) ExecuteActivity( valuePtr interface{}, optimizeByLocalActivity bool, - ctx interpreter.UnifiedContext, activity interface{}, args ...interface{}, + ctx interfaces.UnifiedContext, activity interface{}, args ...interface{}, ) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -272,7 +272,7 @@ func (w *workflowProvider) ExecuteActivity( return f.Get(wfCtx, valuePtr) } -func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { +func (w *workflowProvider) Now(ctx interfaces.UnifiedContext) time.Time { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -280,7 +280,7 @@ func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time { return workflow.Now(wfCtx) } -func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration) (err error) { +func (w *workflowProvider) Sleep(ctx interfaces.UnifiedContext, d time.Duration) (err error) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -288,7 +288,7 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration return workflow.Sleep(wfCtx, d) } -func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { +func (w *workflowProvider) IsReplaying(ctx interfaces.UnifiedContext) bool { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -297,7 +297,7 @@ func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool { } func (w *workflowProvider) GetVersion( - ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int, + ctx interfaces.UnifiedContext, changeID string, minSupported, maxSupported int, ) int { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { @@ -316,7 +316,7 @@ func (t *temporalReceiveChannel) ReceiveAsync(valuePtr interface{}) (ok bool) { return t.channel.ReceiveAsync(valuePtr) } -func (t *temporalReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, valuePtr interface{}) (ok bool) { +func (t *temporalReceiveChannel) ReceiveBlocking(ctx interfaces.UnifiedContext, valuePtr interface{}) (ok bool) { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -326,8 +326,8 @@ func (t *temporalReceiveChannel) ReceiveBlocking(ctx interpreter.UnifiedContext, } func (w *workflowProvider) GetSignalChannel( - ctx interpreter.UnifiedContext, signalName string, -) interpreter.ReceiveChannel { + ctx interfaces.UnifiedContext, signalName string, +) interfaces.ReceiveChannel { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -338,7 +338,7 @@ func (w *workflowProvider) GetSignalChannel( } } -func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key string) interface{} { +func (w *workflowProvider) GetContextValue(ctx interfaces.UnifiedContext, key string) interface{} { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -346,7 +346,7 @@ func (w *workflowProvider) GetContextValue(ctx interpreter.UnifiedContext, key s return wfCtx.Value(key) } -func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter.UnifiedLogger { +func (w *workflowProvider) GetLogger(ctx interfaces.UnifiedContext) interfaces.UnifiedLogger { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") @@ -354,7 +354,7 @@ func (w *workflowProvider) GetLogger(ctx interpreter.UnifiedContext) interpreter return workflow.GetLogger(wfCtx) } -func (w *workflowProvider) GetUnhandledSignalNames(ctx interpreter.UnifiedContext) []string { +func (w *workflowProvider) GetUnhandledSignalNames(ctx interfaces.UnifiedContext) []string { wfCtx, ok := ctx.GetContext().(workflow.Context) if !ok { panic("cannot convert to temporal workflow context") diff --git a/service/interpreter/timers/greedyTimerProcessor.go b/service/interpreter/timers/greedyTimerProcessor.go new file mode 100644 index 00000000..6883dd51 --- /dev/null +++ b/service/interpreter/timers/greedyTimerProcessor.go @@ -0,0 +1,264 @@ +package timers + +import ( + "github.com/indeedeng/iwf/service/interpreter/cont" + "github.com/indeedeng/iwf/service/interpreter/interfaces" + "time" + + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" +) + +type sortedTimers struct { + status service.InternalTimerStatus + // Ordered slice of all timers being awaited on + timers []*service.TimerInfo +} + +type GreedyTimerProcessor struct { + pendingTimers sortedTimers + stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo + staleSkipTimerSignals []service.StaleSkipTimerSignal + provider interfaces.WorkflowProvider + logger interfaces.UnifiedLogger +} + +func NewGreedyTimerProcessor( + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, + continueAsNewCounter *cont.ContinueAsNewCounter, + staleSkipTimerSignals []service.StaleSkipTimerSignal, +) *GreedyTimerProcessor { + + tp := &GreedyTimerProcessor{ + provider: provider, + pendingTimers: sortedTimers{status: service.TimerPending}, + stateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{}, + logger: provider.GetLogger(ctx), + staleSkipTimerSignals: staleSkipTimerSignals, + } + + // start some single thread that manages timers + tp.createGreedyTimerScheduler(ctx, continueAsNewCounter) + + err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) { + return service.GetCurrentTimerInfosQueryResponse{ + StateExecutionCurrentTimerInfos: tp.stateExecutionCurrentTimerInfos, + }, nil + }) + if err != nil { + panic("cannot set query handler") + } + return tp +} + +func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) { + + if toAdd == nil || toAdd.Status != t.status { + panic("invalid timer added") + } + + insertIndex := 0 + for i, timer := range t.timers { + if toAdd.FiringUnixTimestampSeconds >= timer.FiringUnixTimestampSeconds { + // don't want dupes. Makes remove simpler + if toAdd == timer { + return + } + insertIndex = i + break + } + insertIndex = i + 1 + } + t.timers = append( + t.timers[:insertIndex], + append([]*service.TimerInfo{toAdd}, t.timers[insertIndex:]...)...) +} + +func (t *sortedTimers) removeTimer(toRemove *service.TimerInfo) { + for i, timer := range t.timers { + if toRemove == timer { + t.timers = append(t.timers[:i], t.timers[i+1:]...) + return + } + } +} + +func (t *sortedTimers) pruneToNextTimer(pruneTo int64) *service.TimerInfo { + + if len(t.timers) == 0 { + return nil + } + + index := len(t.timers) + + for i := len(t.timers) - 1; i >= 0; i-- { + timer := t.timers[i] + if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == t.status { + break + } + index = i + } + t.timers = t.timers[:index] + return t.timers[index-1] +} + +func (t *GreedyTimerProcessor) createGreedyTimerScheduler( + ctx interfaces.UnifiedContext, + continueAsNewCounter *cont.ContinueAsNewCounter) { + + t.provider.GoNamed(ctx, "greedy-timer-scheduler", func(ctx interfaces.UnifiedContext) { + // NOTE: next timer to fire is at the end of the slice + var createdTimers []int64 + for { + t.provider.Await(ctx, func() bool { + // remove fired timers + now := t.provider.Now(ctx).Unix() + for i := len(createdTimers) - 1; i >= 0; i-- { + if createdTimers[i] > now { + createdTimers = createdTimers[:i+1] + break + } + } + next := t.pendingTimers.pruneToNextTimer(now) + return (next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1])) || continueAsNewCounter.IsThresholdMet() + }) + + if continueAsNewCounter.IsThresholdMet() { + break + } + + now := t.provider.Now(ctx).Unix() + next := t.pendingTimers.pruneToNextTimer(now) + //next := t.pendingTimers.getEarliestTimer() + // only create a new timer when a pending timer exists before the next existing timer fires + if next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]) { + fireAt := next.FiringUnixTimestampSeconds + duration := time.Duration(fireAt-now) * time.Second + t.provider.NewTimer(ctx, duration) + createdTimers = append(createdTimers, fireAt) + } + } + }) +} + +func (t *GreedyTimerProcessor) Dump() []service.StaleSkipTimerSignal { + return t.staleSkipTimerSignals +} + +func (t *GreedyTimerProcessor) GetCurrentTimerInfos() map[string][]*service.TimerInfo { + return t.stateExecutionCurrentTimerInfos +} + +// SkipTimer will attempt to skip a timer, return false if no valid timer found +func (t *GreedyTimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool { + timer, valid := service.ValidateTimerSkipRequest(t.stateExecutionCurrentTimerInfos, stateExeId, timerId, timerIdx) + if !valid { + // since we have checked it before sending signals, this should only happen in some vary rare cases for racing condition + t.logger.Warn("cannot process timer skip request, maybe state is already closed...putting into a stale skip timer queue", stateExeId, timerId, timerIdx) + + t.staleSkipTimerSignals = append(t.staleSkipTimerSignals, service.StaleSkipTimerSignal{ + StateExecutionId: stateExeId, + TimerCommandId: timerId, + TimerCommandIndex: timerIdx, + }) + return false + } + timer.Status = service.TimerSkipped + return true +} + +func (t *GreedyTimerProcessor) RetryStaleSkipTimer() bool { + for i, staleSkip := range t.staleSkipTimerSignals { + found := t.SkipTimer(staleSkip.StateExecutionId, staleSkip.TimerCommandId, staleSkip.TimerCommandIndex) + if found { + newList := removeElement(t.staleSkipTimerSignals, i) + t.staleSkipTimerSignals = newList + return true + } + } + return false +} + +// WaitForTimerFiredOrSkipped waits for timer completed(fired or skipped), +// return true when the timer is fired or skipped +// return false if the waitingCommands is canceled by cancelWaiting bool pointer(when the trigger type is completed, or continueAsNew) +func (t *GreedyTimerProcessor) WaitForTimerFiredOrSkipped( + ctx interfaces.UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool, +) service.InternalTimerStatus { + timerInfos := t.stateExecutionCurrentTimerInfos[stateExeId] + if len(timerInfos) == 0 { + if *cancelWaiting { + // The waiting thread is later than the timer execState thread + // The execState thread got completed early and call RemovePendingTimersOfState to remove the timerInfos + // returning pending here + return service.TimerPending + } else { + panic("bug: this shouldn't happen") + } + } + timer := timerInfos[timerIdx] + if timer.Status == service.TimerFired || timer.Status == service.TimerSkipped { + return timer.Status + } + skippedByStaleSkip := t.RetryStaleSkipTimer() + if skippedByStaleSkip { + t.logger.Warn("timer skipped by stale skip signal", stateExeId, timerIdx) + timer.Status = service.TimerSkipped + return service.TimerSkipped + } + + _ = t.provider.Await(ctx, func() bool { + return timer.Status == service.TimerFired || timer.Status == service.TimerSkipped || timer.FiringUnixTimestampSeconds <= t.provider.Now(ctx).Unix() || *cancelWaiting + }) + + if timer.Status == service.TimerSkipped { + return service.TimerSkipped + } + + if timer.FiringUnixTimestampSeconds >= t.provider.Now(ctx).Unix() { + timer.Status = service.TimerFired + return service.TimerFired + } + + // otherwise *cancelWaiting should return false to indicate that this timer isn't completed(fired or skipped) + t.pendingTimers.removeTimer(timer) + return service.TimerPending +} + +// RemovePendingTimersOfState is for when a state is completed, remove all its pending timers +func (t *GreedyTimerProcessor) RemovePendingTimersOfState(stateExeId string) { + + timers := t.stateExecutionCurrentTimerInfos[stateExeId] + + for _, timer := range timers { + t.pendingTimers.removeTimer(timer) + } + + delete(t.stateExecutionCurrentTimerInfos, stateExeId) +} + +func (t *GreedyTimerProcessor) AddTimers( + stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus, +) { + timers := make([]*service.TimerInfo, len(commands)) + for idx, cmd := range commands { + var timer service.TimerInfo + if status, ok := completedTimerCmds[idx]; ok { + timer = service.TimerInfo{ + CommandId: cmd.CommandId, + FiringUnixTimestampSeconds: cmd.GetFiringUnixTimestampSeconds(), + Status: status, + } + } else { + timer = service.TimerInfo{ + CommandId: cmd.CommandId, + FiringUnixTimestampSeconds: cmd.GetFiringUnixTimestampSeconds(), + Status: service.TimerPending, + } + t.pendingTimers.addTimer(&timer) + } + timers[idx] = &timer + } + t.stateExecutionCurrentTimerInfos[stateExeId] = timers +} diff --git a/service/interpreter/timerProcessor.go b/service/interpreter/timers/simpleTimerProcessor.go similarity index 71% rename from service/interpreter/timerProcessor.go rename to service/interpreter/timers/simpleTimerProcessor.go index ebd7fcf3..91c6d9c9 100644 --- a/service/interpreter/timerProcessor.go +++ b/service/interpreter/timers/simpleTimerProcessor.go @@ -1,23 +1,24 @@ -package interpreter +package timers import ( + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" ) -type TimerProcessor struct { +type SimpleTimerProcessor struct { stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo staleSkipTimerSignals []service.StaleSkipTimerSignal - provider WorkflowProvider - logger UnifiedLogger + provider interfaces.WorkflowProvider + logger interfaces.UnifiedLogger } -func NewTimerProcessor( - ctx UnifiedContext, provider WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal, -) *TimerProcessor { - tp := &TimerProcessor{ +func NewSimpleTimerProcessor( + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal, +) *SimpleTimerProcessor { + tp := &SimpleTimerProcessor{ provider: provider, stateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{}, logger: provider.GetLogger(ctx), @@ -35,16 +36,16 @@ func NewTimerProcessor( return tp } -func (t *TimerProcessor) Dump() []service.StaleSkipTimerSignal { +func (t *SimpleTimerProcessor) Dump() []service.StaleSkipTimerSignal { return t.staleSkipTimerSignals } -func (t *TimerProcessor) GetCurrentTimerInfos() map[string][]*service.TimerInfo { +func (t *SimpleTimerProcessor) GetCurrentTimerInfos() map[string][]*service.TimerInfo { return t.stateExecutionCurrentTimerInfos } // SkipTimer will attempt to skip a timer, return false if no valid timer found -func (t *TimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool { +func (t *SimpleTimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool { timer, valid := service.ValidateTimerSkipRequest(t.stateExecutionCurrentTimerInfos, stateExeId, timerId, timerIdx) if !valid { // since we have checked it before sending signals, this should only happen in some vary rare cases for racing condition @@ -61,7 +62,7 @@ func (t *TimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) boo return true } -func (t *TimerProcessor) RetryStaleSkipTimer() bool { +func (t *SimpleTimerProcessor) RetryStaleSkipTimer() bool { for i, staleSkip := range t.staleSkipTimerSignals { found := t.SkipTimer(staleSkip.StateExecutionId, staleSkip.TimerCommandId, staleSkip.TimerCommandIndex) if found { @@ -73,16 +74,11 @@ func (t *TimerProcessor) RetryStaleSkipTimer() bool { return false } -func removeElement(s []service.StaleSkipTimerSignal, i int) []service.StaleSkipTimerSignal { - s[i] = s[len(s)-1] - return s[:len(s)-1] -} - // WaitForTimerFiredOrSkipped waits for timer completed(fired or skipped), // return true when the timer is fired or skipped // return false if the waitingCommands is canceled by cancelWaiting bool pointer(when the trigger type is completed, or continueAsNew) -func (t *TimerProcessor) WaitForTimerFiredOrSkipped( - ctx UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool, +func (t *SimpleTimerProcessor) WaitForTimerFiredOrSkipped( + ctx interfaces.UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool, ) service.InternalTimerStatus { timerInfos := t.stateExecutionCurrentTimerInfos[stateExeId] if len(timerInfos) == 0 { @@ -122,11 +118,11 @@ func (t *TimerProcessor) WaitForTimerFiredOrSkipped( } // RemovePendingTimersOfState is for when a state is completed, remove all its pending timers -func (t *TimerProcessor) RemovePendingTimersOfState(stateExeId string) { +func (t *SimpleTimerProcessor) RemovePendingTimersOfState(stateExeId string) { delete(t.stateExecutionCurrentTimerInfos, stateExeId) } -func (t *TimerProcessor) AddTimers( +func (t *SimpleTimerProcessor) AddTimers( stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus, ) { timers := make([]*service.TimerInfo, len(commands)) @@ -150,21 +146,3 @@ func (t *TimerProcessor) AddTimers( } t.stateExecutionCurrentTimerInfos[stateExeId] = timers } - -// FixTimerCommandFromActivityOutput converts the durationSeconds to firingUnixTimestampSeconds -// doing it right after the activity output so that we don't need to worry about the time drift after continueAsNew -func FixTimerCommandFromActivityOutput(now time.Time, request iwfidl.CommandRequest) iwfidl.CommandRequest { - var timerCommands []iwfidl.TimerCommand - for _, cmd := range request.GetTimerCommands() { - if cmd.HasDurationSeconds() { - timerCommands = append(timerCommands, iwfidl.TimerCommand{ - CommandId: cmd.CommandId, - FiringUnixTimestampSeconds: iwfidl.PtrInt64(now.Unix() + int64(cmd.GetDurationSeconds())), - }) - } else { - timerCommands = append(timerCommands, cmd) - } - } - request.TimerCommands = timerCommands - return request -} diff --git a/service/interpreter/timers/utils.go b/service/interpreter/timers/utils.go new file mode 100644 index 00000000..9e39a9b9 --- /dev/null +++ b/service/interpreter/timers/utils.go @@ -0,0 +1,31 @@ +package timers + +import ( + "time" + + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" +) + +func removeElement(s []service.StaleSkipTimerSignal, i int) []service.StaleSkipTimerSignal { + s[i] = s[len(s)-1] + return s[:len(s)-1] +} + +// FixTimerCommandFromActivityOutput converts the durationSeconds to firingUnixTimestampSeconds +// doing it right after the activity output so that we don't need to worry about the time drift after continueAsNew +func FixTimerCommandFromActivityOutput(now time.Time, request iwfidl.CommandRequest) iwfidl.CommandRequest { + var timerCommands []iwfidl.TimerCommand + for _, cmd := range request.GetTimerCommands() { + if cmd.HasDurationSeconds() { + timerCommands = append(timerCommands, iwfidl.TimerCommand{ + CommandId: cmd.CommandId, + FiringUnixTimestampSeconds: iwfidl.PtrInt64(now.Unix() + int64(cmd.GetDurationSeconds())), + }) + } else { + timerCommands = append(timerCommands, cmd) + } + } + request.TimerCommands = timerCommands + return request +} diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index b747f862..439dff13 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -7,7 +7,11 @@ import ( "github.com/indeedeng/iwf/service/common/event" "github.com/indeedeng/iwf/service/common/ptr" "github.com/indeedeng/iwf/service/common/utils" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" "github.com/indeedeng/iwf/service/interpreter/env" + "github.com/indeedeng/iwf/service/interpreter/interfaces" + "github.com/indeedeng/iwf/service/interpreter/timers" "time" "github.com/indeedeng/iwf/service/common/compatibility" @@ -18,7 +22,7 @@ import ( ) func InterpreterImpl( - ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, input service.InterpreterWorkflowInput, ) (output *service.InterpreterWorkflowOutput, retErr error) { defer func() { if !provider.IsReplaying(ctx) { @@ -70,7 +74,7 @@ func InterpreterImpl( } } - workflowConfiger := NewWorkflowConfiger(input.Config) + workflowConfiger := config.NewWorkflowConfiger(input.Config) basicInfo := service.BasicInfo{ IwfWorkflowType: input.IwfWorkflowType, IwfWorkerUrl: input.IwfWorkerUrl, @@ -79,8 +83,8 @@ func InterpreterImpl( var internalChannel *InternalChannel var stateRequestQueue *StateRequestQueue var persistenceManager *PersistenceManager - var timerProcessor *TimerProcessor - var continueAsNewCounter *ContinueAsNewCounter + var timerProcessor interfaces.TimerProcessor + var continueAsNewCounter *cont.ContinueAsNewCounter var signalReceiver *SignalReceiver var stateExecutionCounter *StateExecutionCounter var outputCollector *OutputCollector @@ -99,8 +103,12 @@ func InterpreterImpl( internalChannel = RebuildInternalChannel(previous.InterStateChannelReceived) stateRequestQueue = NewStateRequestQueueWithResumeRequests(previous.StatesToStartFromBeginning, previous.StateExecutionsToResume) persistenceManager = RebuildPersistenceManager(provider, previous.DataObjects, previous.SearchAttributes, input.UseMemoForDataAttributes) - timerProcessor = NewTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) - continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) + continueAsNewCounter = cont.NewContinueAsCounter(workflowConfiger, ctx, provider) + if input.Config.GetOptimizeTimer() { + timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, continueAsNewCounter, previous.StaleSkipTimerSignals) + } else { + timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, previous.StaleSkipTimerSignals) + } signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, previous.SignalsReceived) counterInfo := previous.StateExecutionCounterInfo stateExecutionCounter = RebuildStateExecutionCounter(ctx, provider, globalVersioner, @@ -112,8 +120,12 @@ func InterpreterImpl( internalChannel = NewInternalChannel() stateRequestQueue = NewStateRequestQueue() persistenceManager = NewPersistenceManager(provider, input.InitDataAttributes, input.InitSearchAttributes, input.UseMemoForDataAttributes) - timerProcessor = NewTimerProcessor(ctx, provider, nil) - continueAsNewCounter = NewContinueAsCounter(workflowConfiger, ctx, provider) + continueAsNewCounter = cont.NewContinueAsCounter(workflowConfiger, ctx, provider) + if input.Config.GetOptimizeTimer() { + timerProcessor = timers.NewGreedyTimerProcessor(ctx, provider, continueAsNewCounter, nil) + } else { + timerProcessor = timers.NewSimpleTimerProcessor(ctx, provider, nil) + } signalReceiver = NewSignalReceiver(ctx, provider, internalChannel, stateRequestQueue, persistenceManager, timerProcessor, continueAsNewCounter, workflowConfiger, nil) stateExecutionCounter = NewStateExecutionCounter(ctx, provider, globalVersioner, workflowConfiger, continueAsNewCounter) outputCollector = NewOutputCollector(nil) @@ -203,7 +215,7 @@ func InterpreterImpl( // execute in another thread for parallelism // state must be passed via parameter https://stackoverflow.com/questions/67263092 stateCtx := provider.ExtendContextWithValue(ctx, "stateReq", stateReqForLoopingOnly) - provider.GoNamed(stateCtx, "state-execution-thread:"+stateReqForLoopingOnly.GetStateId(), func(ctx UnifiedContext) { + provider.GoNamed(stateCtx, "state-execution-thread:"+stateReqForLoopingOnly.GetStateId(), func(ctx interfaces.UnifiedContext) { stateReq, ok := provider.GetContextValue(ctx, "stateReq").(StateRequest) if !ok { errToFailWf = provider.NewApplicationError( @@ -380,7 +392,7 @@ func InterpreterImpl( } func checkClosingWorkflow( - ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner, decision *iwfidl.StateDecision, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, versioner *GlobalVersioner, decision *iwfidl.StateDecision, currentStateId, currentStateExeId string, internalChannel *InternalChannel, signalReceiver *SignalReceiver, ) (canGoNext, gracefulComplete, forceComplete, forceFail bool, completeOutput *iwfidl.StateCompletionOutput, err error) { @@ -487,7 +499,7 @@ func checkClosingWorkflow( } func DrainReceivedButUnprocessedInternalChannelsFromStateApis( - ctx UnifiedContext, provider WorkflowProvider, versioner *GlobalVersioner, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, versioner *GlobalVersioner, ) error { if versioner.IsAfterVersionOfYieldOnConditionalComplete() { // Just yield, by waiting on an empty lambda, nothing else. @@ -504,8 +516,8 @@ func DrainReceivedButUnprocessedInternalChannelsFromStateApis( } func processStateExecution( - ctx UnifiedContext, - provider WorkflowProvider, + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, globalVersioner *GlobalVersioner, basicInfo service.BasicInfo, stateReq StateRequest, @@ -513,10 +525,10 @@ func processStateExecution( persistenceManager *PersistenceManager, interStateChannel *InternalChannel, signalReceiver *SignalReceiver, - timerProcessor *TimerProcessor, + timerProcessor interfaces.TimerProcessor, continueAsNewer *ContinueAsNewer, - continueAsNewCounter *ContinueAsNewCounter, - configer *WorkflowConfiger, + continueAsNewCounter *cont.ContinueAsNewCounter, + configer *config.WorkflowConfiger, shouldSendSignalOnCompletion bool, ) (*iwfidl.StateDecision, service.StateExecutionStatus, error) { waitUntilApi := StateStart @@ -533,7 +545,7 @@ func processStateExecution( WorkflowStartedTimestamp: info.WorkflowStartTime.Unix(), StateExecutionId: &stateExeId, } - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 30 * time.Second, } @@ -638,7 +650,7 @@ func processStateExecution( } interStateChannel.ProcessPublishing(startResponse.GetPublishToInterStateChannel()) - commandReq = FixTimerCommandFromActivityOutput(provider.Now(ctx), startResponse.GetCommandRequest()) + commandReq = timers.FixTimerCommandFromActivityOutput(provider.Now(ctx), startResponse.GetCommandRequest()) stateExecutionLocal = startResponse.GetUpsertStateLocals() } @@ -650,7 +662,7 @@ func processStateExecution( continue } cmdCtx := provider.ExtendContextWithValue(ctx, "idx", idx) - provider.GoNamed(cmdCtx, getCommandThreadName("timer", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { + provider.GoNamed(cmdCtx, getCommandThreadName("timer", stateExeId, cmd.GetCommandId(), idx), func(ctx interfaces.UnifiedContext) { idx, ok := provider.GetContextValue(ctx, "idx").(int) if !ok { panic("critical code bug") @@ -675,7 +687,7 @@ func processStateExecution( } cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd) cmdCtx = provider.ExtendContextWithValue(cmdCtx, "idx", idx) - provider.GoNamed(cmdCtx, getCommandThreadName("signal", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { + provider.GoNamed(cmdCtx, getCommandThreadName("signal", stateExeId, cmd.GetCommandId(), idx), func(ctx interfaces.UnifiedContext) { cmd, ok := provider.GetContextValue(ctx, "cmd").(iwfidl.SignalCommand) if !ok { panic("critical code bug") @@ -707,7 +719,7 @@ func processStateExecution( } cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd) cmdCtx = provider.ExtendContextWithValue(cmdCtx, "idx", idx) - provider.GoNamed(cmdCtx, getCommandThreadName("interstate", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { + provider.GoNamed(cmdCtx, getCommandThreadName("interstate", stateExeId, cmd.GetCommandId(), idx), func(ctx interfaces.UnifiedContext) { cmd, ok := provider.GetContextValue(ctx, "cmd").(iwfidl.InterStateChannelCommand) if !ok { panic("critical code bug") @@ -812,8 +824,8 @@ func processStateExecution( } func invokeStateExecute( - ctx UnifiedContext, - provider WorkflowProvider, + ctx interfaces.UnifiedContext, + provider interfaces.WorkflowProvider, basicInfo service.BasicInfo, state iwfidl.StateMovement, stateExeId string, @@ -822,13 +834,13 @@ func invokeStateExecute( executionContext iwfidl.Context, commandRes *iwfidl.CommandResults, continueAsNewer *ContinueAsNewer, - configer *WorkflowConfiger, + configer *config.WorkflowConfiger, executeApi interface{}, stateExecutionLocal []iwfidl.KeyValue, shouldSendSignalOnCompletion bool, ) (*iwfidl.StateDecision, service.StateExecutionStatus, error) { var err error - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 30 * time.Second, } if state.StateOptions != nil { @@ -983,7 +995,7 @@ func shouldProceedOnExecuteApiError(state iwfidl.StateMovement) bool { options.GetExecuteApiFailurePolicy() == iwfidl.PROCEED_TO_CONFIGURED_STATE } -func convertStateApiActivityError(provider WorkflowProvider, err error) error { +func convertStateApiActivityError(provider interfaces.WorkflowProvider, err error) error { if provider.IsApplicationError(err) { return err } @@ -994,7 +1006,7 @@ func getCommandThreadName(prefix string, stateExecId, cmdId string, idx int) str return fmt.Sprintf("%v-%v-%v-%v", prefix, stateExecId, cmdId, idx) } -func createUserWorkflowError(provider WorkflowProvider, message string) error { +func createUserWorkflowError(provider interfaces.WorkflowProvider, message string) error { return provider.NewApplicationError( string(iwfidl.INVALID_USER_WORKFLOW_CODE_ERROR_TYPE), message, @@ -1002,7 +1014,7 @@ func createUserWorkflowError(provider WorkflowProvider, message string) error { } func WaitForStateCompletionWorkflowImpl( - ctx UnifiedContext, provider WorkflowProvider, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, ) (*service.WaitForStateCompletionWorkflowOutput, error) { signalReceiveChannel := provider.GetSignalChannel(ctx, service.StateCompletionSignalChannelName) var signalValue iwfidl.StateCompletionOutput diff --git a/service/interpreter/workflowUpdater.go b/service/interpreter/workflowUpdater.go index 6846ee65..eb2d20db 100644 --- a/service/interpreter/workflowUpdater.go +++ b/service/interpreter/workflowUpdater.go @@ -4,27 +4,30 @@ import ( "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/event" + "github.com/indeedeng/iwf/service/interpreter/config" + "github.com/indeedeng/iwf/service/interpreter/cont" + "github.com/indeedeng/iwf/service/interpreter/interfaces" "time" ) type WorkflowUpdater struct { persistenceManager *PersistenceManager - provider WorkflowProvider + provider interfaces.WorkflowProvider continueAsNewer *ContinueAsNewer - continueAsNewCounter *ContinueAsNewCounter + continueAsNewCounter *cont.ContinueAsNewCounter internalChannel *InternalChannel signalReceiver *SignalReceiver stateRequestQueue *StateRequestQueue - configer *WorkflowConfiger - logger UnifiedLogger + configer *config.WorkflowConfiger + logger interfaces.UnifiedLogger basicInfo service.BasicInfo globalVersioner *GlobalVersioner } func NewWorkflowUpdater( - ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, + ctx interfaces.UnifiedContext, provider interfaces.WorkflowProvider, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, - continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, configer *WorkflowConfiger, + continueAsNewer *ContinueAsNewer, continueAsNewCounter *cont.ContinueAsNewCounter, configer *config.WorkflowConfiger, internalChannel *InternalChannel, signalReceiver *SignalReceiver, basicInfo service.BasicInfo, globalVersioner *GlobalVersioner, ) (*WorkflowUpdater, error) { @@ -51,8 +54,8 @@ func NewWorkflowUpdater( } func (u *WorkflowUpdater) handler( - ctx UnifiedContext, input iwfidl.WorkflowRpcRequest, -) (output *HandlerOutput, err error) { + ctx interfaces.UnifiedContext, input iwfidl.WorkflowRpcRequest, +) (output *interfaces.HandlerOutput, err error) { u.continueAsNewer.IncreaseInflightOperation() defer u.continueAsNewer.DecreaseInflightOperation() @@ -80,7 +83,7 @@ func (u *WorkflowUpdater) handler( InternalChannelInfo: u.internalChannel.GetInfos(), } - activityOptions := ActivityOptions{ + activityOptions := interfaces.ActivityOptions{ StartToCloseTimeout: 5 * time.Second, RetryPolicy: &iwfidl.RetryPolicy{ MaximumAttemptsDurationSeconds: input.TimeoutSeconds, @@ -88,7 +91,7 @@ func (u *WorkflowUpdater) handler( }, } ctx = u.provider.WithActivityOptions(ctx, activityOptions) - var activityOutput InvokeRpcActivityOutput + var activityOutput interfaces.InvokeRpcActivityOutput err = u.provider.ExecuteActivity(&activityOutput, u.configer.ShouldOptimizeActivity(), ctx, InvokeWorkerRpc, u.provider.GetBackendType(), rpcPrep, input) u.persistenceManager.UnlockPersistence(input.SearchAttributesLoadingPolicy, input.DataAttributesLoadingPolicy) @@ -97,7 +100,7 @@ func (u *WorkflowUpdater) handler( return nil, u.provider.NewApplicationError(string(iwfidl.SERVER_INTERNAL_ERROR_TYPE), "activity invocation failure:"+err.Error()) } - handlerOutput := &HandlerOutput{ + handlerOutput := &interfaces.HandlerOutput{ StatusError: activityOutput.StatusError, } @@ -118,7 +121,7 @@ func (u *WorkflowUpdater) handler( return handlerOutput, nil } -func (u *WorkflowUpdater) validator(_ UnifiedContext, input iwfidl.WorkflowRpcRequest) error { +func (u *WorkflowUpdater) validator(_ interfaces.UnifiedContext, input iwfidl.WorkflowRpcRequest) error { var daKeys, saKeys []string if input.HasDataAttributesLoadingPolicy() { daKeys = input.DataAttributesLoadingPolicy.LockingKeys