From 6580cbe0aa41a8b515791f95c2c15bb37db1dab1 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Fri, 2 Apr 2021 17:07:37 -0700 Subject: [PATCH] Add ResetWorkflowExecution method to the client (#400) --- client/client.go | 5 +++++ internal/client.go | 5 +++++ internal/internal_workflow_client.go | 24 ++++++++++++++++++++++++ mocks/Client.go | 23 +++++++++++++++++++++++ mocks/mock_test.go | 26 ++++++++++++++++++++++++++ test/integration_test.go | 22 ++++++++++++++++++++++ 6 files changed, 105 insertions(+) diff --git a/client/client.go b/client/client.go index 125c14566..13d614344 100644 --- a/client/client.go +++ b/client/client.go @@ -341,6 +341,11 @@ type ( // - EntityNotExistError DescribeTaskQueue(ctx context.Context, taskqueue string, taskqueueType enumspb.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error) + // ResetWorkflowExecution reset an existing workflow execution to WorkflowTaskFinishEventId(exclusive). + // And it will immediately terminating the current execution instance. + // RequestId is used to deduplicate requests. It will be autogenerated if not set. + ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error) + // Close client and clean up underlying resources. Close() } diff --git a/internal/client.go b/internal/client.go index 6ab3f12e3..b2211f669 100644 --- a/internal/client.go +++ b/internal/client.go @@ -317,6 +317,11 @@ type ( // - EntityNotExistError DescribeTaskQueue(ctx context.Context, taskqueue string, taskqueueType enumspb.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error) + // ResetWorkflowExecution reset an existing workflow execution to WorkflowTaskFinishEventId(exclusive). + // And it will immediately terminating the current execution instance. + // RequestId is used to deduplicate requests. It will be autogenerated if not set. + ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error) + // Close client and clean up underlying resources. Close() } diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index df582433d..d5eb707d3 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -1003,6 +1003,30 @@ func (wc *WorkflowClient) DescribeTaskQueue(ctx context.Context, taskQueue strin return resp, nil } +// ResetWorkflowExecution reset an existing workflow execution to WorkflowTaskFinishEventId(exclusive). +// And it will immediately terminating the current execution instance. +// RequestId is used to deduplicate requests. It will be autogenerated if not set. +func (wc *WorkflowClient) ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error) { + if request != nil && request.GetRequestId() == "" { + request.RequestId = uuid.New() + } + + var resp *workflowservice.ResetWorkflowExecutionResponse + err := backoff.Retry(ctx, + func() error { + grpcCtx, cancel := newGRPCContext(ctx) + defer cancel() + var err error + resp, err = wc.workflowService.ResetWorkflowExecution(grpcCtx, request) + return err + }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) + if err != nil { + return nil, err + } + + return resp, nil +} + // Close client and clean up underlying resources. func (wc *WorkflowClient) Close() { if wc.connectionCloser == nil { diff --git a/mocks/Client.go b/mocks/Client.go index 72a0292ac..20e97ff56 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -490,6 +490,29 @@ func (_m *Client) TerminateWorkflow(ctx context.Context, workflowID string, runI return r0 } +// ResetWorkflowExecution provides a mock function with given fields: request +func (_m *Client) ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error) { + ret := _m.Called(ctx, request) + + var r0 *workflowservice.ResetWorkflowExecutionResponse + if rf, ok := ret.Get(0).(func(context.Context, *workflowservice.ResetWorkflowExecutionRequest) *workflowservice.ResetWorkflowExecutionResponse); ok { + r0 = rf(ctx, request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*workflowservice.ResetWorkflowExecutionResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *workflowservice.ResetWorkflowExecutionRequest) error); ok { + r1 = rf(ctx, request) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Close provides a mock function without given fields func (_m *Client) Close() { ret := _m.Called() diff --git a/mocks/mock_test.go b/mocks/mock_test.go index eb8ad1a50..6b7c6de6f 100644 --- a/mocks/mock_test.go +++ b/mocks/mock_test.go @@ -30,8 +30,10 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" ) @@ -91,3 +93,27 @@ func Test_MockClient(t *testing.T) { require.NotNil(t, next) require.NoError(t, err) } + +func Test_MockResetWorkflowExecution(t *testing.T) { + mockClient := &Client{} + + req := &workflowservice.ResetWorkflowExecutionRequest{ + Namespace: "test-namespace", + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: "wid", + RunId: "rid", + }, + Reason: "bad deployment", + WorkflowTaskFinishEventId: 6, + RequestId: "request-id-random", + } + resp := &workflowservice.ResetWorkflowExecutionResponse{ + RunId: "new-run-id", + } + + mockClient.On("ResetWorkflowExecution", mock.Anything, mock.Anything).Return(resp, nil).Once() + actualResp, err := mockClient.ResetWorkflowExecution(context.Background(), req) + mockClient.AssertExpectations(t) + require.NoError(t, err) + require.Equal(t, "new-run-id", actualResp.GetRunId()) +} diff --git a/test/integration_test.go b/test/integration_test.go index 636d461d5..81e255e27 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -878,6 +878,28 @@ func (ts *IntegrationTestSuite) TestTimerCancellationConcurrentWithOtherCommandD ts.NoError(err) } +func (ts *IntegrationTestSuite) TestResetWorkflowExecution() { + var originalResult []string + err := ts.executeWorkflow("basic-reset-workflow-execution", ts.workflows.Basic, &originalResult) + ts.NoError(err) + resp, err := ts.client.ResetWorkflowExecution(context.Background(), &workflowservice.ResetWorkflowExecutionRequest{ + Namespace: namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: "basic-reset-workflow-execution", + }, + Reason: "integration test", + WorkflowTaskFinishEventId: 4, + }) + + ts.NoError(err) + ts.NotEmpty(resp.GetRunId()) + newWf := ts.client.GetWorkflow(context.Background(), "basic-reset-workflow-execution", resp.GetRunId()) + var newResult []string + err = newWf.Get(context.Background(), &newResult) + ts.NoError(err) + ts.Equal(originalResult, newResult) +} + func (ts *IntegrationTestSuite) registerNamespace() { client, err := client.NewNamespaceClient(client.Options{HostPort: ts.config.ServiceAddr}) ts.NoError(err)