Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-163: Add retry policy to QueryWorkflow calls #491

Merged
merged 10 commits into from
Nov 26, 2024
5 changes: 3 additions & 2 deletions cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func start(c *cli.Context) {
if err != nil {
rawLog.Fatalf("Unable to connect to Temporal because of error %v", err)
}
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false)
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false, &config.Api.QueryWorkflowFailedRetryPolicy)

for _, svcName := range services {
go launchTemporalService(svcName, *config, unifiedClient, temporalClient, logger)
Expand All @@ -146,7 +146,8 @@ func start(c *cli.Context) {
if err != nil {
rawLog.Fatalf("Unable to connect to Cadence because of error %v", err)
}
unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)

unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &config.Api.QueryWorkflowFailedRetryPolicy)

for _, svcName := range services {
go launchCadenceService(svcName, *config, unifiedClient, serviceClient, domain, closeFunc, logger)
Expand Down
26 changes: 26 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ type (
OmitRpcInputOutputInHistory *bool `yaml:"omitRpcInputOutputInHistory"`
// WaitForStateCompletionMigration is used to control workflowId of the WaitForStateCompletion system/internal workflows
WaitForStateCompletionMigration WaitForStateCompletionMigration `yaml:"waitForStateCompletionMigration"`
QueryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy `yaml:"queryWorkflowFailedRetryPolicy"`
}

QueryWorkflowFailedRetryPolicy struct {
// defaults to 1
InitialIntervalSeconds int `yaml:"initialIntervalSeconds"`
// defaults to 5
MaximumAttempts int `yaml:"maximumAttempts"`
}

WaitForStateCompletionMigration struct {
Expand Down Expand Up @@ -147,3 +155,21 @@ func (c Config) GetWaitForOnWithDefault() string {
}
return "old"
}

func QueryWorkflowFailedRetryPolicyWithDefaults(retryPolicy *QueryWorkflowFailedRetryPolicy) QueryWorkflowFailedRetryPolicy {
var rp QueryWorkflowFailedRetryPolicy

if retryPolicy != nil && retryPolicy.InitialIntervalSeconds != 0 {
rp.InitialIntervalSeconds = retryPolicy.InitialIntervalSeconds
} else {
rp.InitialIntervalSeconds = 1
}

if retryPolicy != nil && retryPolicy.MaximumAttempts != 0 {
rp.MaximumAttempts = retryPolicy.MaximumAttempts
} else {
rp.MaximumAttempts = 5
}

return rp
}
3 changes: 3 additions & 0 deletions config/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
defaultWorkflowConfig:
continueAsNewThreshold: 100
Expand Down
3 changes: 3 additions & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
temporal:
hostPort: localhost:7233
Expand Down
3 changes: 3 additions & 0 deletions config/development_cadence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
# interpreterActivityConfig:
# disableSystemSearchAttributes: true # (deprecated) set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085
Expand Down
4 changes: 4 additions & 0 deletions integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
SignalWithStartOn: "old",
WaitForOn: "old",
},
QueryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: 1,
MaximumAttempts: 5,
},
},
Interpreter: config.Interpreter{
VerboseDebug: false,
Expand Down
2 changes: 0 additions & 2 deletions integ/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ func doTestCreateWithoutStartingState(t *testing.T, backendType service.BackendT
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second * 2)

// workflow shouldn't executed any state
var dump service.ContinueAsNewDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.ContinueAsNewDumpQueryType)
Expand Down
3 changes: 0 additions & 3 deletions integ/signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
panicAtHttpError(err, httpResp)

// test update config
time.Sleep(time.Second)
var debugDump service.DebugDumpResponse
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
Expand All @@ -134,7 +133,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second)
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
Expand All @@ -154,7 +152,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second)
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
Expand Down
3 changes: 0 additions & 3 deletions integ/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *
}).Execute()
panicAtHttpError(err, httpResp)

if config != nil {
time.Sleep(time.Second * 2)
}
err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType)
if err != nil {
log.Fatalf("Fail to invoke query %v", err)
Expand Down
18 changes: 12 additions & 6 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
if err != nil {
panic(err)
}
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption)
iwfService := api.NewService(createTestConfig(config), uclient, logger)

testCfg := createTestConfig(config)

uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption, &testCfg.Api.QueryWorkflowFailedRetryPolicy)
iwfService := api.NewService(testCfg, uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -122,7 +125,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := temporal.NewInterpreterWorker(createTestConfig(config), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter := temporal.NewInterpreterWorker(testCfg, temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
if *disableStickyCache {
interpreter.StartWithStickyCacheDisabledForTest()
} else {
Expand All @@ -144,8 +147,11 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
if err != nil {
panic(err)
}
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)
iwfService := api.NewService(createTestConfig(config), uclient, logger)

testCfg := createTestConfig(config)

uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &testCfg.Api.QueryWorkflowFailedRetryPolicy)
iwfService := api.NewService(testCfg, uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -157,7 +163,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := cadence.NewInterpreterWorker(createTestConfig(config), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter := cadence.NewInterpreterWorker(testCfg, serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
if *disableStickyCache {
interpreter.StartWithStickyCacheDisabledForTest()
} else {
Expand Down
50 changes: 38 additions & 12 deletions service/client/cadence/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/indeedeng/iwf/config"
"time"

"github.com/indeedeng/iwf/service"
Expand All @@ -23,11 +24,12 @@ import (
)

type cadenceClient struct {
domain string
cClient client.Client
closeFunc func()
serviceClient workflowserviceclient.Interface
converter encoded.DataConverter
domain string
cClient client.Client
closeFunc func()
serviceClient workflowserviceclient.Interface
converter encoded.DataConverter
queryWorkflowFailedRetryPolicy config.QueryWorkflowFailedRetryPolicy
}

func (t *cadenceClient) IsWorkflowAlreadyStartedError(err error) bool {
Expand All @@ -52,6 +54,12 @@ func (t *cadenceClient) IsNotFoundError(err error) bool {
return ok
}

func (t *cadenceClient) isQueryFailedError(err error) bool {
var serviceError *shared.QueryFailedError
ok := errors.As(err, &serviceError)
return ok
}

func (t *cadenceClient) IsWorkflowTimeoutError(err error) bool {
return realcadence.IsTimeoutError(err)
}
Expand Down Expand Up @@ -83,14 +91,15 @@ func (t *cadenceClient) GetApplicationErrorDetails(err error, detailsPtr interfa

func NewCadenceClient(
domain string, cClient client.Client, serviceClient workflowserviceclient.Interface,
converter encoded.DataConverter, closeFunc func(),
converter encoded.DataConverter, closeFunc func(), retryPolicy *config.QueryWorkflowFailedRetryPolicy,
) uclient.UnifiedClient {
return &cadenceClient{
domain: domain,
cClient: cClient,
closeFunc: closeFunc,
serviceClient: serviceClient,
converter: converter,
domain: domain,
cClient: cClient,
closeFunc: closeFunc,
serviceClient: serviceClient,
converter: converter,
queryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicyWithDefaults(retryPolicy),
}
}

Expand Down Expand Up @@ -227,7 +236,24 @@ func (t *cadenceClient) ListWorkflow(
func (t *cadenceClient) QueryWorkflow(
ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{},
) error {
qres, err := queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args)
var qres encoded.Value
var err error

attempt := 1
// Only QueryFailed error causes retry; all other errors make the loop to finish immediately
for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts {
qres, err = t.cClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...)
if err == nil {
break
} else {
if t.isQueryFailedError(err) {
time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second)
attempt++
continue
}
return err
}
}
if err != nil {
return err
}
Expand Down
47 changes: 37 additions & 10 deletions service/client/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/google/uuid"
"github.com/indeedeng/iwf/config"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
uclient "github.com/indeedeng/iwf/service/client"
Expand All @@ -20,23 +21,26 @@ import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
realtemporal "go.temporal.io/sdk/temporal"
"time"
)

type temporalClient struct {
tClient client.Client
namespace string
dataConverter converter.DataConverter
memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045
tClient client.Client
namespace string
dataConverter converter.DataConverter
memoEncryption bool // this is a workaround for https://github.com/temporalio/sdk-go/issues/1045
queryWorkflowFailedRetryPolicy config.QueryWorkflowFailedRetryPolicy
}

func NewTemporalClient(
tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool,
tClient client.Client, namespace string, dataConverter converter.DataConverter, memoEncryption bool, retryPolicy *config.QueryWorkflowFailedRetryPolicy,
) uclient.UnifiedClient {
return &temporalClient{
tClient: tClient,
namespace: namespace,
dataConverter: dataConverter,
memoEncryption: memoEncryption,
tClient: tClient,
namespace: namespace,
dataConverter: dataConverter,
memoEncryption: memoEncryption,
queryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicyWithDefaults(retryPolicy),
}
}

Expand Down Expand Up @@ -71,6 +75,12 @@ func (t *temporalClient) IsNotFoundError(err error) bool {
return ok
}

func (t *temporalClient) isQueryFailedError(err error) bool {
var serviceError *serviceerror.QueryFailed
ok := errors.As(err, &serviceError)
return ok
}

func (t *temporalClient) IsRequestTimeoutError(err error) bool {
var deadlineExceeded *serviceerror.DeadlineExceeded
ok := errors.As(err, &deadlineExceeded)
Expand Down Expand Up @@ -257,7 +267,24 @@ func (t *temporalClient) ListWorkflow(
func (t *temporalClient) QueryWorkflow(
ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{},
) error {
qres, err := t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...)
var qres converter.EncodedValue
var err error

attempt := 1
// Only QueryFailed error causes retry; all other errors make the loop to finish immediately
for attempt <= t.queryWorkflowFailedRetryPolicy.MaximumAttempts {
qres, err = t.tClient.QueryWorkflow(ctx, workflowID, runID, queryType, args...)
if err == nil {
break
} else {
if t.isQueryFailedError(err) {
time.Sleep(time.Duration(t.queryWorkflowFailedRetryPolicy.InitialIntervalSeconds) * time.Second)
attempt++
continue
}
return err
}
}
if err != nil {
return err
}
Expand Down
11 changes: 9 additions & 2 deletions service/client/temporal/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package temporal
import (
"errors"
"github.com/golang/mock/gomock"
"github.com/indeedeng/iwf/config"
"github.com/stretchr/testify/assert"
"go.temporal.io/api/serviceerror"
"testing"
Expand All @@ -13,7 +14,10 @@ func TestAlreadyStartedErrorForWorkflow(t *testing.T) {
mockRealTemporalClient := NewMockClient(ctrl)
mockDataConverter := NewMockDataConverter(ctrl)

client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false)
client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false, &config.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: 0,
MaximumAttempts: 0,
})

err := &serviceerror.WorkflowExecutionAlreadyStarted{}
assert.Equal(t, true, client.IsWorkflowAlreadyStartedError(err))
Expand All @@ -24,7 +28,10 @@ func TestAlreadyStartedErrorForCronWorkflow(t *testing.T) {
mockRealTemporalClient := NewMockClient(ctrl)
mockDataConverter := NewMockDataConverter(ctrl)

client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false)
client := NewTemporalClient(mockRealTemporalClient, "test-ns", mockDataConverter, false, &config.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: 0,
MaximumAttempts: 0,
})

err := errors.New("schedule with this ID is already registered")

Expand Down