From 9f8a4e7cc942c51ba2cf3ccc6a36a124b88de24f Mon Sep 17 00:00:00 2001 From: Ktrops Date: Wed, 27 Nov 2024 09:29:26 -0800 Subject: [PATCH] IWF-335: support skipping update methods for reset (#486) Co-authored-by: Katie Atrops --- gen/iwfidl/api/openapi.yaml | 3 + gen/iwfidl/docs/WorkflowResetRequest.md | 26 ++++++++ gen/iwfidl/model_workflow_reset_request.go | 36 +++++++++++ go.mod | 10 ++- go.sum | 12 ++-- integ/locking_test.go | 74 +++++++++++++++++++++- iwf-idl | 2 +- service/client/temporal/client.go | 9 ++- 8 files changed, 155 insertions(+), 17 deletions(-) diff --git a/gen/iwfidl/api/openapi.yaml b/gen/iwfidl/api/openapi.yaml index 2c0a0288..78b65e99 100644 --- a/gen/iwfidl/api/openapi.yaml +++ b/gen/iwfidl/api/openapi.yaml @@ -1428,6 +1428,7 @@ components: skipSignalReapply: true workflowRunId: workflowRunId workflowId: workflowId + skipUpdateReapply: true stateExecutionId: stateExecutionId properties: workflowId: @@ -1448,6 +1449,8 @@ components: type: string skipSignalReapply: type: boolean + skipUpdateReapply: + type: boolean required: - resetType - workflowId diff --git a/gen/iwfidl/docs/WorkflowResetRequest.md b/gen/iwfidl/docs/WorkflowResetRequest.md index 18c103da..ad7794a8 100644 --- a/gen/iwfidl/docs/WorkflowResetRequest.md +++ b/gen/iwfidl/docs/WorkflowResetRequest.md @@ -13,6 +13,7 @@ Name | Type | Description | Notes **StateId** | Pointer to **string** | | [optional] **StateExecutionId** | Pointer to **string** | | [optional] **SkipSignalReapply** | Pointer to **bool** | | [optional] +**SkipUpdateReapply** | Pointer to **bool** | | [optional] ## Methods @@ -248,6 +249,31 @@ SetSkipSignalReapply sets SkipSignalReapply field to given value. HasSkipSignalReapply returns a boolean if a field has been set. +### GetSkipUpdateReapply + +`func (o *WorkflowResetRequest) GetSkipUpdateReapply() bool` + +GetSkipUpdateReapply returns the SkipUpdateReapply field if non-nil, zero value otherwise. + +### GetSkipUpdateReapplyOk + +`func (o *WorkflowResetRequest) GetSkipUpdateReapplyOk() (*bool, bool)` + +GetSkipUpdateReapplyOk returns a tuple with the SkipUpdateReapply field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetSkipUpdateReapply + +`func (o *WorkflowResetRequest) SetSkipUpdateReapply(v bool)` + +SetSkipUpdateReapply sets SkipUpdateReapply field to given value. + +### HasSkipUpdateReapply + +`func (o *WorkflowResetRequest) HasSkipUpdateReapply() bool` + +HasSkipUpdateReapply returns a boolean if a field has been set. + [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/gen/iwfidl/model_workflow_reset_request.go b/gen/iwfidl/model_workflow_reset_request.go index ac6f7df5..b87dd234 100644 --- a/gen/iwfidl/model_workflow_reset_request.go +++ b/gen/iwfidl/model_workflow_reset_request.go @@ -28,6 +28,7 @@ type WorkflowResetRequest struct { StateId *string `json:"stateId,omitempty"` StateExecutionId *string `json:"stateExecutionId,omitempty"` SkipSignalReapply *bool `json:"skipSignalReapply,omitempty"` + SkipUpdateReapply *bool `json:"skipUpdateReapply,omitempty"` } // NewWorkflowResetRequest instantiates a new WorkflowResetRequest object @@ -321,6 +322,38 @@ func (o *WorkflowResetRequest) SetSkipSignalReapply(v bool) { o.SkipSignalReapply = &v } +// GetSkipUpdateReapply returns the SkipUpdateReapply field value if set, zero value otherwise. +func (o *WorkflowResetRequest) GetSkipUpdateReapply() bool { + if o == nil || IsNil(o.SkipUpdateReapply) { + var ret bool + return ret + } + return *o.SkipUpdateReapply +} + +// GetSkipUpdateReapplyOk returns a tuple with the SkipUpdateReapply field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *WorkflowResetRequest) GetSkipUpdateReapplyOk() (*bool, bool) { + if o == nil || IsNil(o.SkipUpdateReapply) { + return nil, false + } + return o.SkipUpdateReapply, true +} + +// HasSkipUpdateReapply returns a boolean if a field has been set. +func (o *WorkflowResetRequest) HasSkipUpdateReapply() bool { + if o != nil && !IsNil(o.SkipUpdateReapply) { + return true + } + + return false +} + +// SetSkipUpdateReapply gets a reference to the given bool and assigns it to the SkipUpdateReapply field. +func (o *WorkflowResetRequest) SetSkipUpdateReapply(v bool) { + o.SkipUpdateReapply = &v +} + func (o WorkflowResetRequest) MarshalJSON() ([]byte, error) { toSerialize, err := o.ToMap() if err != nil { @@ -354,6 +387,9 @@ func (o WorkflowResetRequest) ToMap() (map[string]interface{}, error) { if !IsNil(o.SkipSignalReapply) { toSerialize["skipSignalReapply"] = o.SkipSignalReapply } + if !IsNil(o.SkipUpdateReapply) { + toSerialize["skipUpdateReapply"] = o.SkipUpdateReapply + } return toSerialize, nil } diff --git a/go.mod b/go.mod index 81ea51a9..23a3f087 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/uber-go/tally/v4 v4.1.1 github.com/uber/cadence-idl v0.0.0-20220713235846-fda89e95df1e github.com/urfave/cli v1.22.5 - go.temporal.io/sdk v1.29.1 + go.temporal.io/sdk v1.30.0 go.temporal.io/sdk/contrib/tally v0.1.0 go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20220331154559-fd0d1eb548eb go.uber.org/cadence v0.17.1-0.20230105221902-f50f452a8eae // pin to pick GetUnhandledSignalNames API @@ -19,14 +19,12 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) -require ( - github.com/pkg/errors v0.9.1 - go.temporal.io/api v1.39.0 -) +require go.temporal.io/api v1.40.0 require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect - github.com/nexus-rpc/sdk-go v0.0.10 // indirect + github.com/nexus-rpc/sdk-go v0.0.11 // indirect + github.com/pkg/errors v0.9.1 // indirect golang.org/x/crypto v0.26.0 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/sync v0.8.0 // indirect diff --git a/go.sum b/go.sum index 07c619e9..e3ff2478 100644 --- a/go.sum +++ b/go.sum @@ -269,8 +269,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nexus-rpc/sdk-go v0.0.10 h1:7jEPUlsghxoD4OJ2H8YbFJ1t4wbxsUef7yZgBfyY3uA= -github.com/nexus-rpc/sdk-go v0.0.10/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ= +github.com/nexus-rpc/sdk-go v0.0.11 h1:qH3Us3spfp50t5ca775V1va2eE6z1zMQDZY4mvbw0CI= +github.com/nexus-rpc/sdk-go v0.0.11/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -399,11 +399,11 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA= -go.temporal.io/api v1.39.0 h1:pbhcfvNDB7mllb8lIBqPcg+m6LMG/IhTpdiFxe+0mYk= -go.temporal.io/api v1.39.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= +go.temporal.io/api v1.40.0 h1:rH3HvUUCFr0oecQTBW5tI6DdDQsX2Xb6OFVgt/bvLto= +go.temporal.io/api v1.40.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o= -go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0= -go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ= +go.temporal.io/sdk v1.30.0 h1:7jzSFZYk+tQ2kIYEP+dvrM7AW9EsCEP52JHCjVGuwbI= +go.temporal.io/sdk v1.30.0/go.mod h1:Pv45F/fVDgWKx+jhix5t/dGgqROVaI+VjPLd3CHWqq0= go.temporal.io/sdk/contrib/tally v0.1.0 h1:edAcGKNIDYU7fd10e4C/43dHw/h1F9cACupcmIKwzPI= go.temporal.io/sdk/contrib/tally v0.1.0/go.mod h1:PckZI8gA0AxIBvrgT2FQlm8TaqptYmqRdy2NxOibsZQ= go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20220331154559-fd0d1eb548eb h1:cpSEAnCCkOtcwdY2NysRXdi4Ny19F5V5KplrAVl1/Mo= diff --git a/integ/locking_test.go b/integ/locking_test.go index 3f0ee9a6..72f490e2 100644 --- a/integ/locking_test.go +++ b/integ/locking_test.go @@ -83,7 +83,7 @@ func doTestLockingWorkflow(t *testing.T, backendType service.BackendType, config startReq := iwfidl.WorkflowStartRequest{ WorkflowId: wfId, IwfWorkflowType: locking.WorkflowType, - WorkflowTimeoutSeconds: 100, + WorkflowTimeoutSeconds: 300, IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, StartStateId: ptr.Any(locking.State1), WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ @@ -245,4 +245,76 @@ func doTestLockingWorkflow(t *testing.T, backendType service.BackendType, config }, } assertions.ElementsMatch(expected1, queryResult1.GetObjects()) + + //reset here with reapply and compare counter + resetReq := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background()) + _, httpResp, err = resetReq.WorkflowResetRequest(iwfidl.WorkflowResetRequest{ + WorkflowId: wfId, + ResetType: iwfidl.BEGINNING, + //SkipSignalReapply: ptr.Any(true), + }).Execute() + panicAtHttpError(err, httpResp) + + time.Sleep(time.Second * 20) + req2Reset := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + resp2Reset, httpResp, err := req2Reset.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: wfId, + }).Execute() + panicAtHttpError(err, httpResp) + + assertions.Equal(iwfidl.COMPLETED, resp2Reset.GetWorkflowStatus()) + + //TODO: There is a bug in the Temporal go SDK where only the first update method is actually executed. When that is fixed the following code can be uncommented to test resetting update methods. + //time.Sleep(time.Second * 10) + //reqRpcReset := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background()) + //_, httpResp, err = reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{ + // WorkflowId: wfId, + // RpcName: locking.RPCName, + // Input: locking.UnblockValue, + //}).Execute() + //panicAtHttpError(err, httpResp) + + //time.Sleep(time.Second * 20) + //req2Reset := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + //resp2Reset, httpResp, err := req2Reset.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + // WorkflowId: wfId, + //}).Execute() + //panicAtHttpError(err, httpResp) + + //s2StartsDecides := locking.InParallelS2 + rpcIncrease // locking.InParallelS2 original state executions, and a new trigger from rpc + //finalCounterValue := int64(locking.InParallelS2 + 2*rpcIncrease) + //stateCompletionCount := locking.InParallelS2 + rpcIncrease + 1 + //resetHistory, _ := wfHandler.GetTestResult() + //assertions.Equalf(map[string]int64{ + // "S1_start": 1, + // "S1_decide": 1, + // "StateWaiting_start": 1, + // "StateWaiting_decide": 1, + // "S2_start": int64(s2StartsDecides), + // "S2_decide": int64(s2StartsDecides), + //}, resetHistory, "locking.test fail, %v", history) + // + //assertions.Equal(iwfidl.COMPLETED, resp2Reset.GetWorkflowStatus()) + //assertions.Equal(stateCompletionCount, len(resp2Reset.GetResults())) + // + //reqSearchReset := apiClient.DefaultApi.ApiV1WorkflowSearchattributesGetPost(context.Background()) + //searchResultReset, httpResp, err := reqSearchReset.WorkflowGetSearchAttributesRequest(iwfidl.WorkflowGetSearchAttributesRequest{ + // WorkflowId: wfId, + // Keys: []iwfidl.SearchAttributeKeyAndType{ + // { + // Key: iwfidl.PtrString(locking.TestSearchAttributeIntKey), + // ValueType: ptr.Any(iwfidl.INT), + // }, + // }, + //}).Execute() + //panicAtHttpError(err, httpResp) + // + //expectedSearchAttributeIntReset := iwfidl.SearchAttribute{ + // Key: iwfidl.PtrString(locking.TestSearchAttributeIntKey), + // ValueType: ptr.Any(iwfidl.INT), + // IntegerValue: iwfidl.PtrInt64(finalCounterValue), + //} + //assertions.Equal([]iwfidl.SearchAttribute{expectedSearchAttributeIntReset}, searchResultReset.GetSearchAttributes()) + + //reset here without update reapply and counter should be less } diff --git a/iwf-idl b/iwf-idl index 75978e48..a6fbd8fe 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit 75978e48d6a929a2bb761e5c8f67d6caac99d658 +Subproject commit a6fbd8feb427fdf77b7ddc0f7c9d7eb154c385e7 diff --git a/service/client/temporal/client.go b/service/client/temporal/client.go index 5338b353..e3bdb645 100644 --- a/service/client/temporal/client.go +++ b/service/client/temporal/client.go @@ -461,9 +461,12 @@ func (t *temporalClient) ResetWorkflow( } requestId := uuid.New().String() - resetReapplyType := enums.RESET_REAPPLY_TYPE_SIGNAL + var resetReapplyExcludeTypes []enums.ResetReapplyExcludeType if request.GetSkipSignalReapply() { - resetReapplyType = enums.RESET_REAPPLY_TYPE_NONE + resetReapplyExcludeTypes = append(resetReapplyExcludeTypes, enums.RESET_REAPPLY_EXCLUDE_TYPE_SIGNAL) + } + if request.GetSkipUpdateReapply() { + resetReapplyExcludeTypes = append(resetReapplyExcludeTypes, enums.RESET_REAPPLY_EXCLUDE_TYPE_UPDATE) } resp, err := t.tClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ @@ -475,7 +478,7 @@ func (t *temporalClient) ResetWorkflow( Reason: request.GetReason(), WorkflowTaskFinishEventId: resetEventId, RequestId: requestId, - ResetReapplyType: resetReapplyType, + ResetReapplyExcludeTypes: resetReapplyExcludeTypes, }) if err != nil {