Skip to content

Commit

Permalink
Add ResetWorkflowExecution method to the client (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Apr 3, 2021
1 parent f4384a5 commit 6580cbe
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 0 deletions.
5 changes: 5 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ type (
// - EntityNotExistError
DescribeTaskQueue(ctx context.Context, taskqueue string, taskqueueType enumspb.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error)

// ResetWorkflowExecution reset an existing workflow execution to WorkflowTaskFinishEventId(exclusive).
// And it will immediately terminating the current execution instance.
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// Close client and clean up underlying resources.
Close()
}
Expand Down
5 changes: 5 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ type (
// - EntityNotExistError
DescribeTaskQueue(ctx context.Context, taskqueue string, taskqueueType enumspb.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error)

// ResetWorkflowExecution reset an existing workflow execution to WorkflowTaskFinishEventId(exclusive).
// And it will immediately terminating the current execution instance.
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// Close client and clean up underlying resources.
Close()
}
Expand Down
24 changes: 24 additions & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,30 @@ func (wc *WorkflowClient) DescribeTaskQueue(ctx context.Context, taskQueue strin
return resp, nil
}

// ResetWorkflowExecution reset an existing workflow execution to WorkflowTaskFinishEventId(exclusive).
// And it will immediately terminating the current execution instance.
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
func (wc *WorkflowClient) ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error) {
if request != nil && request.GetRequestId() == "" {
request.RequestId = uuid.New()
}

var resp *workflowservice.ResetWorkflowExecutionResponse
err := backoff.Retry(ctx,
func() error {
grpcCtx, cancel := newGRPCContext(ctx)
defer cancel()
var err error
resp, err = wc.workflowService.ResetWorkflowExecution(grpcCtx, request)
return err
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
if err != nil {
return nil, err
}

return resp, nil
}

// Close client and clean up underlying resources.
func (wc *WorkflowClient) Close() {
if wc.connectionCloser == nil {
Expand Down
23 changes: 23 additions & 0 deletions mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions mocks/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/sdk/client"
)
Expand Down Expand Up @@ -91,3 +93,27 @@ func Test_MockClient(t *testing.T) {
require.NotNil(t, next)
require.NoError(t, err)
}

func Test_MockResetWorkflowExecution(t *testing.T) {
mockClient := &Client{}

req := &workflowservice.ResetWorkflowExecutionRequest{
Namespace: "test-namespace",
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: "wid",
RunId: "rid",
},
Reason: "bad deployment",
WorkflowTaskFinishEventId: 6,
RequestId: "request-id-random",
}
resp := &workflowservice.ResetWorkflowExecutionResponse{
RunId: "new-run-id",
}

mockClient.On("ResetWorkflowExecution", mock.Anything, mock.Anything).Return(resp, nil).Once()
actualResp, err := mockClient.ResetWorkflowExecution(context.Background(), req)
mockClient.AssertExpectations(t)
require.NoError(t, err)
require.Equal(t, "new-run-id", actualResp.GetRunId())
}
22 changes: 22 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,28 @@ func (ts *IntegrationTestSuite) TestTimerCancellationConcurrentWithOtherCommandD
ts.NoError(err)
}

func (ts *IntegrationTestSuite) TestResetWorkflowExecution() {
var originalResult []string
err := ts.executeWorkflow("basic-reset-workflow-execution", ts.workflows.Basic, &originalResult)
ts.NoError(err)
resp, err := ts.client.ResetWorkflowExecution(context.Background(), &workflowservice.ResetWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: "basic-reset-workflow-execution",
},
Reason: "integration test",
WorkflowTaskFinishEventId: 4,
})

ts.NoError(err)
ts.NotEmpty(resp.GetRunId())
newWf := ts.client.GetWorkflow(context.Background(), "basic-reset-workflow-execution", resp.GetRunId())
var newResult []string
err = newWf.Get(context.Background(), &newResult)
ts.NoError(err)
ts.Equal(originalResult, newResult)
}

func (ts *IntegrationTestSuite) registerNamespace() {
client, err := client.NewNamespaceClient(client.Options{HostPort: ts.config.ServiceAddr})
ts.NoError(err)
Expand Down

0 comments on commit 6580cbe

Please sign in to comment.