Skip to content

Commit

Permalink
IWF-335: support skipping update methods for reset (#486)
Browse files Browse the repository at this point in the history
Co-authored-by: Katie Atrops <[email protected]>
  • Loading branch information
ktrops and Katie Atrops authored Nov 27, 2024
1 parent 6ead62a commit 9f8a4e7
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 17 deletions.
3 changes: 3 additions & 0 deletions gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,7 @@ components:
skipSignalReapply: true
workflowRunId: workflowRunId
workflowId: workflowId
skipUpdateReapply: true
stateExecutionId: stateExecutionId
properties:
workflowId:
Expand All @@ -1448,6 +1449,8 @@ components:
type: string
skipSignalReapply:
type: boolean
skipUpdateReapply:
type: boolean
required:
- resetType
- workflowId
Expand Down
26 changes: 26 additions & 0 deletions gen/iwfidl/docs/WorkflowResetRequest.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Name | Type | Description | Notes
**StateId** | Pointer to **string** | | [optional]
**StateExecutionId** | Pointer to **string** | | [optional]
**SkipSignalReapply** | Pointer to **bool** | | [optional]
**SkipUpdateReapply** | Pointer to **bool** | | [optional]

## Methods

Expand Down Expand Up @@ -248,6 +249,31 @@ SetSkipSignalReapply sets SkipSignalReapply field to given value.

HasSkipSignalReapply returns a boolean if a field has been set.

### GetSkipUpdateReapply

`func (o *WorkflowResetRequest) GetSkipUpdateReapply() bool`

GetSkipUpdateReapply returns the SkipUpdateReapply field if non-nil, zero value otherwise.

### GetSkipUpdateReapplyOk

`func (o *WorkflowResetRequest) GetSkipUpdateReapplyOk() (*bool, bool)`

GetSkipUpdateReapplyOk returns a tuple with the SkipUpdateReapply field if it's non-nil, zero value otherwise
and a boolean to check if the value has been set.

### SetSkipUpdateReapply

`func (o *WorkflowResetRequest) SetSkipUpdateReapply(v bool)`

SetSkipUpdateReapply sets SkipUpdateReapply field to given value.

### HasSkipUpdateReapply

`func (o *WorkflowResetRequest) HasSkipUpdateReapply() bool`

HasSkipUpdateReapply returns a boolean if a field has been set.


[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

Expand Down
36 changes: 36 additions & 0 deletions gen/iwfidl/model_workflow_reset_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/uber-go/tally/v4 v4.1.1
github.com/uber/cadence-idl v0.0.0-20220713235846-fda89e95df1e
github.com/urfave/cli v1.22.5
go.temporal.io/sdk v1.29.1
go.temporal.io/sdk v1.30.0
go.temporal.io/sdk/contrib/tally v0.1.0
go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20220331154559-fd0d1eb548eb
go.uber.org/cadence v0.17.1-0.20230105221902-f50f452a8eae // pin to pick GetUnhandledSignalNames API
Expand All @@ -19,14 +19,12 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/pkg/errors v0.9.1
go.temporal.io/api v1.39.0
)
require go.temporal.io/api v1.40.0

require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/nexus-rpc/sdk-go v0.0.10 // indirect
github.com/nexus-rpc/sdk-go v0.0.11 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nexus-rpc/sdk-go v0.0.10 h1:7jEPUlsghxoD4OJ2H8YbFJ1t4wbxsUef7yZgBfyY3uA=
github.com/nexus-rpc/sdk-go v0.0.10/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ=
github.com/nexus-rpc/sdk-go v0.0.11 h1:qH3Us3spfp50t5ca775V1va2eE6z1zMQDZY4mvbw0CI=
github.com/nexus-rpc/sdk-go v0.0.11/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
Expand Down Expand Up @@ -399,11 +399,11 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA=
go.temporal.io/api v1.39.0 h1:pbhcfvNDB7mllb8lIBqPcg+m6LMG/IhTpdiFxe+0mYk=
go.temporal.io/api v1.39.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.40.0 h1:rH3HvUUCFr0oecQTBW5tI6DdDQsX2Xb6OFVgt/bvLto=
go.temporal.io/api v1.40.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o=
go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0=
go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ=
go.temporal.io/sdk v1.30.0 h1:7jzSFZYk+tQ2kIYEP+dvrM7AW9EsCEP52JHCjVGuwbI=
go.temporal.io/sdk v1.30.0/go.mod h1:Pv45F/fVDgWKx+jhix5t/dGgqROVaI+VjPLd3CHWqq0=
go.temporal.io/sdk/contrib/tally v0.1.0 h1:edAcGKNIDYU7fd10e4C/43dHw/h1F9cACupcmIKwzPI=
go.temporal.io/sdk/contrib/tally v0.1.0/go.mod h1:PckZI8gA0AxIBvrgT2FQlm8TaqptYmqRdy2NxOibsZQ=
go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20220331154559-fd0d1eb548eb h1:cpSEAnCCkOtcwdY2NysRXdi4Ny19F5V5KplrAVl1/Mo=
Expand Down
74 changes: 73 additions & 1 deletion integ/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func doTestLockingWorkflow(t *testing.T, backendType service.BackendType, config
startReq := iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: locking.WorkflowType,
WorkflowTimeoutSeconds: 100,
WorkflowTimeoutSeconds: 300,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(locking.State1),
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
Expand Down Expand Up @@ -245,4 +245,76 @@ func doTestLockingWorkflow(t *testing.T, backendType service.BackendType, config
},
}
assertions.ElementsMatch(expected1, queryResult1.GetObjects())

//reset here with reapply and compare counter
resetReq := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background())
_, httpResp, err = resetReq.WorkflowResetRequest(iwfidl.WorkflowResetRequest{
WorkflowId: wfId,
ResetType: iwfidl.BEGINNING,
//SkipSignalReapply: ptr.Any(true),
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second * 20)
req2Reset := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
resp2Reset, httpResp, err := req2Reset.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

assertions.Equal(iwfidl.COMPLETED, resp2Reset.GetWorkflowStatus())

//TODO: There is a bug in the Temporal go SDK where only the first update method is actually executed. When that is fixed the following code can be uncommented to test resetting update methods.
//time.Sleep(time.Second * 10)
//reqRpcReset := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background())
//_, httpResp, err = reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{
// WorkflowId: wfId,
// RpcName: locking.RPCName,
// Input: locking.UnblockValue,
//}).Execute()
//panicAtHttpError(err, httpResp)

//time.Sleep(time.Second * 20)
//req2Reset := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
//resp2Reset, httpResp, err := req2Reset.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
// WorkflowId: wfId,
//}).Execute()
//panicAtHttpError(err, httpResp)

//s2StartsDecides := locking.InParallelS2 + rpcIncrease // locking.InParallelS2 original state executions, and a new trigger from rpc
//finalCounterValue := int64(locking.InParallelS2 + 2*rpcIncrease)
//stateCompletionCount := locking.InParallelS2 + rpcIncrease + 1
//resetHistory, _ := wfHandler.GetTestResult()
//assertions.Equalf(map[string]int64{
// "S1_start": 1,
// "S1_decide": 1,
// "StateWaiting_start": 1,
// "StateWaiting_decide": 1,
// "S2_start": int64(s2StartsDecides),
// "S2_decide": int64(s2StartsDecides),
//}, resetHistory, "locking.test fail, %v", history)
//
//assertions.Equal(iwfidl.COMPLETED, resp2Reset.GetWorkflowStatus())
//assertions.Equal(stateCompletionCount, len(resp2Reset.GetResults()))
//
//reqSearchReset := apiClient.DefaultApi.ApiV1WorkflowSearchattributesGetPost(context.Background())
//searchResultReset, httpResp, err := reqSearchReset.WorkflowGetSearchAttributesRequest(iwfidl.WorkflowGetSearchAttributesRequest{
// WorkflowId: wfId,
// Keys: []iwfidl.SearchAttributeKeyAndType{
// {
// Key: iwfidl.PtrString(locking.TestSearchAttributeIntKey),
// ValueType: ptr.Any(iwfidl.INT),
// },
// },
//}).Execute()
//panicAtHttpError(err, httpResp)
//
//expectedSearchAttributeIntReset := iwfidl.SearchAttribute{
// Key: iwfidl.PtrString(locking.TestSearchAttributeIntKey),
// ValueType: ptr.Any(iwfidl.INT),
// IntegerValue: iwfidl.PtrInt64(finalCounterValue),
//}
//assertions.Equal([]iwfidl.SearchAttribute{expectedSearchAttributeIntReset}, searchResultReset.GetSearchAttributes())

//reset here without update reapply and counter should be less
}
2 changes: 1 addition & 1 deletion iwf-idl
Submodule iwf-idl updated 2 files
+2 −0 iwf-sdk.yaml
+2 −0 iwf.yaml
9 changes: 6 additions & 3 deletions service/client/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,12 @@ func (t *temporalClient) ResetWorkflow(
}

requestId := uuid.New().String()
resetReapplyType := enums.RESET_REAPPLY_TYPE_SIGNAL
var resetReapplyExcludeTypes []enums.ResetReapplyExcludeType
if request.GetSkipSignalReapply() {
resetReapplyType = enums.RESET_REAPPLY_TYPE_NONE
resetReapplyExcludeTypes = append(resetReapplyExcludeTypes, enums.RESET_REAPPLY_EXCLUDE_TYPE_SIGNAL)
}
if request.GetSkipUpdateReapply() {
resetReapplyExcludeTypes = append(resetReapplyExcludeTypes, enums.RESET_REAPPLY_EXCLUDE_TYPE_UPDATE)
}

resp, err := t.tClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{
Expand All @@ -475,7 +478,7 @@ func (t *temporalClient) ResetWorkflow(
Reason: request.GetReason(),
WorkflowTaskFinishEventId: resetEventId,
RequestId: requestId,
ResetReapplyType: resetReapplyType,
ResetReapplyExcludeTypes: resetReapplyExcludeTypes,
})

if err != nil {
Expand Down

0 comments on commit 9f8a4e7

Please sign in to comment.