Skip to content

Commit

Permalink
Remove
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Nov 19, 2024
1 parent b279844 commit 1233eed
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 79 deletions.
2 changes: 0 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ type (
// Port is the port on which the API service will bind to
Port int `yaml:"port"`
MaxWaitSeconds int64 `yaml:"maxWaitSeconds"`
// OptimizedVersioning is the versioning optimization flag
OptimizedVersioning *bool `yaml:"optimizedVersioning"`
// omitRpcInputOutputInHistory is the flag to omit rpc input/output in history
// the input/output is only for debugging purpose but could be too expensive to store
OmitRpcInputOutputInHistory *bool `yaml:"omitRpcInputOutputInHistory"`
Expand Down
5 changes: 2 additions & 3 deletions integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ const testIwfServerPort = "9715"
func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
return config.Config{
Api: config.ApiConfig{
Port: 9715,
MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test
OptimizedVersioning: testCfg.OptimizedVersioning,
Port: 9715,
MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test
WaitForStateCompletionMigration: config.WaitForStateCompletionMigration{
SignalWithStartOn: "old",
WaitForOn: "old",
Expand Down
1 change: 0 additions & 1 deletion integ/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func doTestLockingWorkflow(t *testing.T, backendType service.BackendType, config
_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
DisableFailAtMemoIncompatibility: true,
OptimizedVersioning: ptr.Any(true),
})
defer closeFunc2()

Expand Down
10 changes: 6 additions & 4 deletions integ/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ func doTestPersistenceWorkflow(
defer closeFunc1()

uclient, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
MemoEncryption: memoEncryption,
OptimizedVersioning: ptr.Any(true),
BackendType: backendType,
MemoEncryption: memoEncryption,
})
defer closeFunc2()

Expand Down Expand Up @@ -402,7 +401,10 @@ func doTestPersistenceWorkflow(
}
}

func getDataAttributes(initReqQry iwfidl.ApiApiV1WorkflowDataobjectsGetPostRequest, wfId string, expectedDataAttribute iwfidl.KeyValue, useMemo bool) (*iwfidl.WorkflowGetDataObjectsResponse, *http.Response, error) {
func getDataAttributes(
initReqQry iwfidl.ApiApiV1WorkflowDataobjectsGetPostRequest, wfId string, expectedDataAttribute iwfidl.KeyValue,
useMemo bool,
) (*iwfidl.WorkflowGetDataObjectsResponse, *http.Response, error) {
return initReqQry.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
WorkflowId: wfId,
Keys: []string{
Expand Down
3 changes: 1 addition & 2 deletions integ/start_delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ func doTestStartDelay(t *testing.T, backendType service.BackendType, config *iwf
defer closeFunc1()

_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
OptimizedVersioning: ptr.Any(false),
BackendType: backendType,
})
defer closeFunc2()

Expand Down
1 change: 0 additions & 1 deletion integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type IwfServiceTestConfig struct {
BackendType service.BackendType
MemoEncryption bool
DisableFailAtMemoIncompatibility bool // default to false so that we will fail at test
OptimizedVersioning *bool
DefaultHeaders map[string]string
}

Expand Down
3 changes: 1 addition & 2 deletions integ/wait_until_search_attributes_optimization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ func doTestWaitUntilHistoryCompleted(
defer closeFunc1()

uclient, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
OptimizedVersioning: ptr.Any(true),
BackendType: backendType,
})
defer closeFunc2()

Expand Down
3 changes: 1 addition & 2 deletions integ/wait_until_search_attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func doTestWaitUntilSearchAttributes(
defer closeFunc1()

_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
OptimizedVersioning: ptr.Any(true),
BackendType: backendType,
})
defer closeFunc2()

Expand Down
21 changes: 7 additions & 14 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"github.com/indeedeng/iwf/config"
"github.com/indeedeng/iwf/service/interpreter/env"
"github.com/indeedeng/iwf/service/interpreter/versions"
"math"
"net/http"
"os"
Expand Down Expand Up @@ -57,16 +56,8 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
) (wresp *iwfidl.WorkflowStartResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

var sysSAs map[string]interface{}
if s.config.Api.OptimizedVersioning != nil && *s.config.Api.OptimizedVersioning {
sysSAs = map[string]interface{}{
service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType,
service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions,
}
} else {
sysSAs = map[string]interface{}{
service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType,
}
sysSAs := map[string]interface{}{
service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType,
}

workflowOptions := uclient.StartWorkflowOptions{
Expand Down Expand Up @@ -156,7 +147,6 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
UseMemoForDataAttributes: useMemoForDAs,
WaitForCompletionStateExecutionIds: req.GetWaitForCompletionStateExecutionIds(),
WaitForCompletionStateIds: req.GetWaitForCompletionStateIds(),
OmitVersionMarker: s.config.Api.OptimizedVersioning,
}

runId, err := s.client.StartInterpreterWorkflow(ctx, workflowOptions, input)
Expand Down Expand Up @@ -422,7 +412,8 @@ func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost(
}

func (s *serviceImpl) ApiV1WorkflowSetQueryAttributesPost(
ctx context.Context, req iwfidl.WorkflowSetDataObjectsRequest) (retError *errors.ErrorAndStatus) {
ctx context.Context, req iwfidl.WorkflowSetDataObjectsRequest,
) (retError *errors.ErrorAndStatus) {
sigVal := service.ExecuteRpcSignalRequest{
UpsertDataObjects: req.Objects,
}
Expand Down Expand Up @@ -457,7 +448,9 @@ func (s *serviceImpl) ApiV1WorkflowGetSearchAttributesPost(
}, nil
}

func (s *serviceImpl) ApiV1WorkflowSetSearchAttributesPost(ctx context.Context, req iwfidl.WorkflowSetSearchAttributesRequest) (retError *errors.ErrorAndStatus) {
func (s *serviceImpl) ApiV1WorkflowSetSearchAttributesPost(
ctx context.Context, req iwfidl.WorkflowSetSearchAttributesRequest,
) (retError *errors.ErrorAndStatus) {
sigVal := service.ExecuteRpcSignalRequest{
UpsertSearchAttributes: req.SearchAttributes,
}
Expand Down
2 changes: 0 additions & 2 deletions service/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type (
IsResumeFromContinueAsNew bool `json:"isResumeFromContinueAsNew,omitempty"`

ContinueAsNewInput *ContinueAsNewInput `json:"continueAsNewInput,omitempty"`

OmitVersionMarker *bool `json:"omtVers,omitempty"`
}

ContinueAsNewInput struct {
Expand Down
56 changes: 20 additions & 36 deletions service/interpreter/globalVersioner.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package interpreter

import (
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/indeedeng/iwf/service/interpreter/versions"
)

const globalChangeId = "global"

const StartingVersionUsingGlobalVersioning = 1
const StartingVersionOptimizedUpsertSearchAttribute = 2
const StartingVersionRenamedStateApi = 3
const StartingVersionContinueAsNewOnNoStates = 4
const StartingVersionTemporal26SDK = 5
const StartingVersionExecutingStateIdMode = 6
const MaxOfAllVersions = StartingVersionExecutingStateIdMode

// GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api
type GlobalVersioner struct {
workflowProvider WorkflowProvider
Expand All @@ -18,60 +23,39 @@ type GlobalVersioner struct {
}

func NewGlobalVersioner(
workflowProvider WorkflowProvider, omitVersionMarker bool, ctx UnifiedContext,
workflowProvider WorkflowProvider, ctx UnifiedContext,
) (*GlobalVersioner, error) {
sas, err := workflowProvider.GetSearchAttributes(ctx, []iwfidl.SearchAttributeKeyAndType{
{Key: ptr.Any(service.SearchAttributeGlobalVersion),
ValueType: ptr.Any(iwfidl.INT)},
})
if err != nil {
return nil, err
}
version := 0
if omitVersionMarker {
// TODO: future improvement https://github.com/indeedeng/iwf/issues/369
attribute, ok := sas[service.SearchAttributeGlobalVersion]
if !ok {
panic("search attribute global version is not found")
}
version = int(attribute.GetIntegerValue())
if versions.MaxOfAllVersions < version {
panic("requesting for a version that is not supported, panic to retry in next workflow task")
}
} else {
version = workflowProvider.GetVersion(ctx, globalChangeId, 0, versions.MaxOfAllVersions)
}
version := workflowProvider.GetVersion(ctx, globalChangeId, 0, MaxOfAllVersions)

return &GlobalVersioner{
workflowProvider: workflowProvider,
ctx: ctx,
version: version,
OmitVersionMarker: omitVersionMarker,
workflowProvider: workflowProvider,
ctx: ctx,
version: version,
}, nil
}

func (p *GlobalVersioner) IsAfterVersionOfContinueAsNewOnNoStates() bool {
return p.version >= versions.StartingVersionContinueAsNewOnNoStates
return p.version >= StartingVersionContinueAsNewOnNoStates
}

func (p *GlobalVersioner) IsAfterVersionOfUsingGlobalVersioning() bool {
return p.version >= versions.StartingVersionUsingGlobalVersioning
return p.version >= StartingVersionUsingGlobalVersioning
}

func (p *GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute() bool {
return p.version >= versions.StartingVersionOptimizedUpsertSearchAttribute
return p.version >= StartingVersionOptimizedUpsertSearchAttribute
}

func (p *GlobalVersioner) IsAfterVersionOfExecutingStateIdMode() bool {
return p.version >= versions.StartingVersionExecutingStateIdMode
return p.version >= StartingVersionExecutingStateIdMode
}

func (p *GlobalVersioner) IsAfterVersionOfRenamedStateApi() bool {
return p.version >= versions.StartingVersionRenamedStateApi
return p.version >= StartingVersionRenamedStateApi
}

func (p *GlobalVersioner) IsAfterVersionOfTemporal26SDK() bool {
return p.version >= versions.StartingVersionTemporal26SDK
return p.version >= StartingVersionTemporal26SDK
}

func (p *GlobalVersioner) UpsertGlobalVersionSearchAttribute() error {
Expand All @@ -83,7 +67,7 @@ func (p *GlobalVersioner) UpsertGlobalVersionSearchAttribute() error {
// https://github.com/uber-go/cadence-client/issues/1198
if p.workflowProvider.GetBackendType() != service.BackendTypeCadence {
return p.workflowProvider.UpsertSearchAttributes(p.ctx, map[string]interface{}{
service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions,
service.SearchAttributeGlobalVersion: MaxOfAllVersions,
})
}
return nil
Expand Down
9 changes: 0 additions & 9 deletions service/interpreter/versions/versions.go

This file was deleted.

2 changes: 1 addition & 1 deletion service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func InterpreterImpl(

var err error

globalVersioner, err := NewGlobalVersioner(provider, input.OmitVersionMarker != nil && *input.OmitVersionMarker, ctx)
globalVersioner, err := NewGlobalVersioner(provider, ctx)
if err != nil {
retErr = err
return
Expand Down

0 comments on commit 1233eed

Please sign in to comment.