Skip to content

Commit

Permalink
Merge branch 'indeedeng:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
duoertai authored Mar 15, 2024
2 parents 664f302 + 5debdce commit 1375463
Show file tree
Hide file tree
Showing 21 changed files with 666 additions and 160 deletions.
4 changes: 2 additions & 2 deletions docker-compose/.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ CASSANDRA_VERSION=3.11.9
ELASTICSEARCH_VERSION=7.16.2
MYSQL_VERSION=8
POSTGRESQL_VERSION=13
TEMPORAL_VERSION=1.21.4
TEMPORAL_UI_VERSION=2.18.0
TEMPORAL_VERSION=1.22.4
TEMPORAL_UI_VERSION=2.23.0
30 changes: 19 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/google/uuid v1.3.0
github.com/prometheus/client_golang v1.12.1
github.com/stretchr/testify v1.8.3
github.com/stretchr/testify v1.8.4
github.com/uber-go/tally/v4 v4.1.1
github.com/uber/cadence-idl v0.0.0-20220713235846-fda89e95df1e
github.com/urfave/cli v1.22.5
go.temporal.io/api v1.21.0
go.temporal.io/sdk v1.24.0
go.temporal.io/sdk v1.25.1
go.temporal.io/sdk/contrib/tally v0.1.0
go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20220331154559-fd0d1eb548eb
go.uber.org/cadence v0.17.1-0.20230105221902-f50f452a8eae // pin to pick GetUnhandledSignalNames API
Expand All @@ -20,7 +19,18 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require github.com/pkg/errors v0.9.1
require (
github.com/pkg/errors v0.9.1
go.temporal.io/api v1.24.0
)

require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/net v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect
)

require (
github.com/BurntSushi/toml v0.4.1 // indirect
Expand Down Expand Up @@ -81,19 +91,17 @@ require (
go.uber.org/net/metrics v1.3.0 // indirect
go.uber.org/thriftrw v1.29.2 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20230525154841-bd750badd5c6 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
google.golang.org/genproto v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
honnef.co/go/tools v0.3.2 // indirect
)
289 changes: 272 additions & 17 deletions go.sum

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions integ/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ func doTestBasicWorkflow(t *testing.T, backendType service.BackendType, config *
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

closeFunc2 := startIwfService(backendType)
_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
SetVersionAtStart: true,
})
defer closeFunc2()

// start a workflow
Expand All @@ -77,7 +80,9 @@ func doTestBasicWorkflow(t *testing.T, backendType service.BackendType, config *
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
WorkflowIDReusePolicy: ptr.Any(iwfidl.REJECT_DUPLICATE),
// CronSchedule: iwfidl.PtrString("* * * * *"),
// TODO: need more work to write integ test for cron
// manual testing for now by uncomment the following line
// CronSchedule: iwfidl.PtrString("* * * * *"),
RetryPolicy: &iwfidl.WorkflowRetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(11),
BackoffCoefficient: iwfidl.PtrFloat32(11),
Expand Down
7 changes: 4 additions & 3 deletions integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
const testWorkflowServerPort = "9714"
const testIwfServerPort = "9715"

func createTestConfig(failAtMemoCompatibility bool) config.Config {
func createTestConfig(failAtMemoCompatibility bool, setVersionAtStart bool) config.Config {
return config.Config{
Api: config.ApiConfig{
Port: 9715,
MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test
Port: 9715,
MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test
SetVersionAtStart: setVersionAtStart,
},
Interpreter: config.Interpreter{
VerboseDebug: false,
Expand Down
2 changes: 1 addition & 1 deletion integ/get_with_wait_timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func doTestWorkflowWithWaitTimeout(t *testing.T, backendType service.BackendType
assertions.Nil(err)
err = json.Unmarshal(body, &errResp)
assertions.Equalf(iwfidl.ErrorResponse{
Detail: ptr.Any("workflow is still running, waiting has exceeded timeout limit"),
Detail: ptr.Any("workflow is still running, waiting has exceeded timeout limit, please retry"),
SubStatus: iwfidl.LONG_POLL_TIME_OUT_SUB_STATUS.Ptr(),
}, errResp, "body")

Expand Down
4 changes: 3 additions & 1 deletion integ/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ func TestPersistenceWorkflowCadenceContinueAsNew(t *testing.T) {
}
}

func doTestPersistenceWorkflow(t *testing.T, backendType service.BackendType, useMemo, memoEncryption bool, config *iwfidl.WorkflowConfig) {
func doTestPersistenceWorkflow(
t *testing.T, backendType service.BackendType, useMemo, memoEncryption bool, config *iwfidl.WorkflowConfig,
) {
assertions := assert.New(t)
wfHandler := persistence.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
Expand Down
9 changes: 5 additions & 4 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type IwfServiceTestConfig struct {
BackendType service.BackendType
MemoEncryption bool
DisableFailAtMemoIncompatibility bool // default to false so that we will fail at test
SetVersionAtStart bool
}

func startIwfService(backendType service.BackendType) (closeFunc func()) {
Expand Down Expand Up @@ -110,7 +111,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility), uclient, logger)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -122,7 +123,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand All @@ -141,7 +142,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility), uclient, logger)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -153,7 +154,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand Down
62 changes: 46 additions & 16 deletions service/api/cadence/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (t *cadenceClient) IsNotFoundError(err error) bool {
return ok
}

func (t *cadenceClient) IsWorkflowTimeoutError(err error) bool {
return realcadence.IsTimeoutError(err)
}

func (t *cadenceClient) IsRequestTimeoutError(err error) bool {
return errors.Is(err, context.DeadlineExceeded)
}
Expand All @@ -63,7 +67,10 @@ func (t *cadenceClient) GetApplicationErrorDetails(err error, detailsPtr interfa
return fmt.Errorf("not an application error. Critical code bug")
}

func NewCadenceClient(domain string, cClient client.Client, serviceClient workflowserviceclient.Interface, converter encoded.DataConverter, closeFunc func()) uclient.UnifiedClient {
func NewCadenceClient(
domain string, cClient client.Client, serviceClient workflowserviceclient.Interface,
converter encoded.DataConverter, closeFunc func(),
) uclient.UnifiedClient {
return &cadenceClient{
domain: domain,
cClient: cClient,
Expand All @@ -77,7 +84,9 @@ func (t *cadenceClient) Close() {
t.closeFunc()
}

func (t *cadenceClient) StartInterpreterWorkflow(ctx context.Context, options uclient.StartWorkflowOptions, args ...interface{}) (runId string, err error) {
func (t *cadenceClient) StartInterpreterWorkflow(
ctx context.Context, options uclient.StartWorkflowOptions, args ...interface{},
) (runId string, err error) {
_, ok := options.Memo[service.UseMemoForDataAttributesKey]
if ok {
return "", fmt.Errorf("using Memo is not supported with Cadence, see https://github.com/uber/cadence/issues/3729")
Expand Down Expand Up @@ -114,29 +123,34 @@ func (t *cadenceClient) StartInterpreterWorkflow(ctx context.Context, options uc
return run.RunID, nil
}

func (t *cadenceClient) StartWaitForStateCompletionWorkflow(ctx context.Context, options uclient.StartWorkflowOptions) (runId string, err error) {
func (t *cadenceClient) StartWaitForStateCompletionWorkflow(
ctx context.Context, options uclient.StartWorkflowOptions,
) (runId string, err error) {
workflowOptions := client.StartWorkflowOptions{
ID: options.ID,
TaskList: options.TaskQueue,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyRejectDuplicate,
ExecutionStartToCloseTimeout: options.WorkflowExecutionTimeout, // TODO, make this configurable
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicateFailedOnly, // the workflow could be timeout, so we allow duplicate
ExecutionStartToCloseTimeout: options.WorkflowExecutionTimeout,
}
run, err := t.cClient.StartWorkflow(ctx, workflowOptions, cadence.WaitforStateCompletionWorkflow)
if err != nil {
if t.IsWorkflowAlreadyStartedError(err) {
// if the workflow is already started, we return the runId
return *err.(*shared.WorkflowExecutionAlreadyStartedError).RunId, nil
}
return "", err
}
return run.RunID, nil
}

func (t *cadenceClient) SignalWithStartWaitForStateCompletionWorkflow(ctx context.Context, options uclient.StartWorkflowOptions, stateCompletionOutput iwfidl.StateCompletionOutput) error {
func (t *cadenceClient) SignalWithStartWaitForStateCompletionWorkflow(
ctx context.Context, options uclient.StartWorkflowOptions, stateCompletionOutput iwfidl.StateCompletionOutput,
) error {
workflowOptions := client.StartWorkflowOptions{
ID: options.ID,
TaskList: options.TaskQueue,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyRejectDuplicate,
ExecutionStartToCloseTimeout: options.WorkflowExecutionTimeout, // TODO, make this configurable
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicateFailedOnly, // the workflow could be timeout, so we allow duplicate
ExecutionStartToCloseTimeout: options.WorkflowExecutionTimeout,
}

_, err := t.cClient.SignalWithStartWorkflow(ctx, options.ID, service.StateCompletionSignalChannelName, stateCompletionOutput, workflowOptions, cadence.WaitforStateCompletionWorkflow)
Expand All @@ -146,7 +160,9 @@ func (t *cadenceClient) SignalWithStartWaitForStateCompletionWorkflow(ctx contex
return nil
}

func (t *cadenceClient) SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error {
func (t *cadenceClient) SignalWorkflow(
ctx context.Context, workflowID string, runID string, signalName string, arg interface{},
) error {
return t.cClient.SignalWorkflow(ctx, workflowID, runID, signalName, arg)
}

Expand All @@ -165,7 +181,9 @@ func (t *cadenceClient) TerminateWorkflow(ctx context.Context, workflowID string
return t.cClient.TerminateWorkflow(ctx, workflowID, runID, reasonStr, nil)
}

func (t *cadenceClient) ListWorkflow(ctx context.Context, request *uclient.ListWorkflowExecutionsRequest) (*uclient.ListWorkflowExecutionsResponse, error) {
func (t *cadenceClient) ListWorkflow(
ctx context.Context, request *uclient.ListWorkflowExecutionsRequest,
) (*uclient.ListWorkflowExecutionsResponse, error) {
listReq := &shared.ListWorkflowExecutionsRequest{
PageSize: &request.PageSize,
Query: &request.Query,
Expand All @@ -188,15 +206,19 @@ func (t *cadenceClient) ListWorkflow(ctx context.Context, request *uclient.ListW
}, nil
}

func (t *cadenceClient) QueryWorkflow(ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{}) error {
func (t *cadenceClient) QueryWorkflow(
ctx context.Context, valuePtr interface{}, workflowID string, runID string, queryType string, args ...interface{},
) error {
qres, err := queryWorkflowWithStrongConsistency(t, ctx, workflowID, runID, queryType, args)
if err != nil {
return err
}
return qres.Get(valuePtr)
}

func queryWorkflowWithStrongConsistency(t *cadenceClient, ctx context.Context, workflowID string, runID string, queryType string, args []interface{}) (encoded.Value, error) {
func queryWorkflowWithStrongConsistency(
t *cadenceClient, ctx context.Context, workflowID string, runID string, queryType string, args []interface{},
) (encoded.Value, error) {
queryWorkflowWithOptionsRequest := &client.QueryWorkflowWithOptionsRequest{
WorkflowID: workflowID,
RunID: runID,
Expand All @@ -211,7 +233,9 @@ func queryWorkflowWithStrongConsistency(t *cadenceClient, ctx context.Context, w
return result.QueryResult, nil
}

func (t *cadenceClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType) (*uclient.DescribeWorkflowExecutionResponse, error) {
func (t *cadenceClient) DescribeWorkflowExecution(
ctx context.Context, workflowID, runID string, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType,
) (*uclient.DescribeWorkflowExecutionResponse, error) {
resp, err := t.cClient.DescribeWorkflowExecution(ctx, workflowID, runID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -295,16 +319,22 @@ func mapToIwfWorkflowStatus(status *shared.WorkflowExecutionCloseStatus) (iwfidl
}
}

func (t *cadenceClient) GetWorkflowResult(ctx context.Context, valuePtr interface{}, workflowID string, runID string) error {
func (t *cadenceClient) GetWorkflowResult(
ctx context.Context, valuePtr interface{}, workflowID string, runID string,
) error {
run := t.cClient.GetWorkflow(ctx, workflowID, runID)
return run.Get(ctx, valuePtr)
}

func (t *cadenceClient) SynchronousUpdateWorkflow(ctx context.Context, valuePtr interface{}, workflowID, runID, updateType string, input interface{}) error {
func (t *cadenceClient) SynchronousUpdateWorkflow(
ctx context.Context, valuePtr interface{}, workflowID, runID, updateType string, input interface{},
) error {
return fmt.Errorf("not supported in Cadence")
}

func (t *cadenceClient) ResetWorkflow(ctx context.Context, request iwfidl.WorkflowResetRequest) (newRunId string, err error) {
func (t *cadenceClient) ResetWorkflow(
ctx context.Context, request iwfidl.WorkflowResetRequest,
) (newRunId string, err error) {

reqRunId := request.GetWorkflowRunId()
if reqRunId == "" {
Expand Down
Loading

0 comments on commit 1375463

Please sign in to comment.