Skip to content

Commit

Permalink
Cleanup of ContinueAsNewError (#326)
Browse files Browse the repository at this point in the history
* Removed wfn and args from ContinueAsNewError

* Added ContinueAsNew to internalbindings
  • Loading branch information
mfateev committed Jan 4, 2021
1 parent d321488 commit 0811d35
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 39 deletions.
37 changes: 17 additions & 20 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"errors"
"fmt"
"reflect"
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -164,9 +165,14 @@ type (

// ContinueAsNewError contains information about how to continue the workflow as new.
ContinueAsNewError struct {
wfn interface{}
args []interface{}
params *ExecuteWorkflowParams
//params *ExecuteWorkflowParams
WorkflowType *WorkflowType
Input *commonpb.Payloads
Header *commonpb.Header
TaskQueueName string
WorkflowExecutionTimeout time.Duration
WorkflowRunTimeout time.Duration
WorkflowTaskTimeout time.Duration
}

// UnknownExternalWorkflowExecutionError can be returned when external workflow doesn't exist
Expand Down Expand Up @@ -406,14 +412,15 @@ func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *C
panic(err)
}

params := &ExecuteWorkflowParams{
WorkflowOptions: *options,
WorkflowType: workflowType,
Input: input,
Header: getWorkflowHeader(ctx, options.ContextPropagators),
attempt: 1,
return &ContinueAsNewError{
WorkflowType: workflowType,
Input: input,
Header: getWorkflowHeader(ctx, options.ContextPropagators),
TaskQueueName: options.TaskQueueName,
WorkflowExecutionTimeout: options.WorkflowExecutionTimeout,
WorkflowRunTimeout: options.WorkflowRunTimeout,
WorkflowTaskTimeout: options.WorkflowTaskTimeout,
}
return &ContinueAsNewError{wfn: wfn, args: args, params: params}
}

// Error from error interface.
Expand Down Expand Up @@ -560,16 +567,6 @@ func (e *ContinueAsNewError) message() string {
return "continue as new"
}

// WorkflowType return WorkflowType of the new run
func (e *ContinueAsNewError) WorkflowType() *WorkflowType {
return e.params.WorkflowType
}

// Args return workflow argument of the new run
func (e *ContinueAsNewError) Args() []interface{} {
return e.args
}

// newTerminatedError creates NewTerminatedError instance
func newTerminatedError() *TerminatedError {
return &TerminatedError{}
Expand Down
17 changes: 9 additions & 8 deletions internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ func Test_ContinueAsNewError(t *testing.T) {
return NewContinueAsNewError(ctx, continueAsNewWfName, a1, a2)
}

headerValue, err := converter.GetDefaultDataConverter().ToPayload("test-data")
dataConverter := converter.GetDefaultDataConverter()
headerValue, err := dataConverter.ToPayload("test-data")
assert.NoError(t, err)
header := &commonpb.Header{
Fields: map[string]*commonpb.Payload{"test": headerValue},
Expand All @@ -516,16 +517,16 @@ func Test_ContinueAsNewError(t *testing.T) {
err = errors.Unwrap(workflowErr)
var continueAsNewErr *ContinueAsNewError
require.True(t, errors.As(err, &continueAsNewErr))
require.Equal(t, continueAsNewWfName, continueAsNewErr.WorkflowType().Name)
require.Equal(t, continueAsNewWfName, continueAsNewErr.WorkflowType.Name)

args := continueAsNewErr.Args()
intArg, ok := args[0].(int)
require.True(t, ok)
input := continueAsNewErr.Input
var intArg int
var stringArg string
err = dataConverter.FromPayloads(input, &intArg, &stringArg)
require.NoError(t, err)
require.Equal(t, a1, intArg)
stringArg, ok := args[1].(string)
require.True(t, ok)
require.Equal(t, a2, stringArg)
require.Equal(t, header, continueAsNewErr.params.Header)
require.Equal(t, header, continueAsNewErr.Header)
}

type coolError struct{}
Expand Down
12 changes: 6 additions & 6 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,12 +1463,12 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
metricsScope.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1)
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION)
closeCommand.Attributes = &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{
WorkflowType: &commonpb.WorkflowType{Name: contErr.params.WorkflowType.Name},
Input: contErr.params.Input,
TaskQueue: &taskqueuepb.TaskQueue{Name: contErr.params.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
WorkflowRunTimeout: &contErr.params.WorkflowRunTimeout,
WorkflowTaskTimeout: &contErr.params.WorkflowTaskTimeout,
Header: contErr.params.Header,
WorkflowType: &commonpb.WorkflowType{Name: contErr.WorkflowType.Name},
Input: contErr.Input,
TaskQueue: &taskqueuepb.TaskQueue{Name: contErr.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
WorkflowRunTimeout: &contErr.WorkflowRunTimeout,
WorkflowTaskTimeout: &contErr.WorkflowTaskTimeout,
Header: contErr.Header,
Memo: workflowContext.workflowInfo.Memo,
SearchAttributes: workflowContext.workflowInfo.SearchAttributes,
}}
Expand Down
10 changes: 5 additions & 5 deletions internal/internal_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,11 @@ func (s *WorkflowUnitTest) Test_ContinueAsNewWorkflow() {
err = errors.Unwrap(workflowErr)
var resultErr *ContinueAsNewError
s.True(errors.As(err, &resultErr))
s.EqualValues("continueAsNewWorkflowTest", resultErr.params.WorkflowType.Name)
s.EqualValues(100*time.Second, resultErr.params.WorkflowExecutionTimeout)
s.EqualValues(50*time.Second, resultErr.params.WorkflowRunTimeout)
s.EqualValues(5*time.Second, resultErr.params.WorkflowTaskTimeout)
s.EqualValues("default-test-taskqueue", resultErr.params.TaskQueueName)
s.EqualValues("continueAsNewWorkflowTest", resultErr.WorkflowType.Name)
s.EqualValues(100*time.Second, resultErr.WorkflowExecutionTimeout)
s.EqualValues(50*time.Second, resultErr.WorkflowRunTimeout)
s.EqualValues(5*time.Second, resultErr.WorkflowTaskTimeout)
s.EqualValues("default-test-taskqueue", resultErr.TaskQueueName)
}

func cancelWorkflowTest(ctx Context) (string, error) {
Expand Down
2 changes: 2 additions & 0 deletions internalbindings/internalbindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ type (
ResultHandler = internal.ResultHandler
// TimerID uniquely identifies timer
TimerID = internal.TimerID
// ContinueAsNewError used by a workflow to request continue as new
ContinueAsNewError = internal.ContinueAsNewError
)

0 comments on commit 0811d35

Please sign in to comment.