diff --git a/internal/error.go b/internal/error.go index 7a830d2af..8090fd345 100644 --- a/internal/error.go +++ b/internal/error.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "reflect" + "time" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -164,9 +165,14 @@ type ( // ContinueAsNewError contains information about how to continue the workflow as new. ContinueAsNewError struct { - wfn interface{} - args []interface{} - params *ExecuteWorkflowParams + //params *ExecuteWorkflowParams + WorkflowType *WorkflowType + Input *commonpb.Payloads + Header *commonpb.Header + TaskQueueName string + WorkflowExecutionTimeout time.Duration + WorkflowRunTimeout time.Duration + WorkflowTaskTimeout time.Duration } // UnknownExternalWorkflowExecutionError can be returned when external workflow doesn't exist @@ -406,14 +412,15 @@ func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *C panic(err) } - params := &ExecuteWorkflowParams{ - WorkflowOptions: *options, - WorkflowType: workflowType, - Input: input, - Header: getWorkflowHeader(ctx, options.ContextPropagators), - attempt: 1, + return &ContinueAsNewError{ + WorkflowType: workflowType, + Input: input, + Header: getWorkflowHeader(ctx, options.ContextPropagators), + TaskQueueName: options.TaskQueueName, + WorkflowExecutionTimeout: options.WorkflowExecutionTimeout, + WorkflowRunTimeout: options.WorkflowRunTimeout, + WorkflowTaskTimeout: options.WorkflowTaskTimeout, } - return &ContinueAsNewError{wfn: wfn, args: args, params: params} } // Error from error interface. @@ -560,16 +567,6 @@ func (e *ContinueAsNewError) message() string { return "continue as new" } -// WorkflowType return WorkflowType of the new run -func (e *ContinueAsNewError) WorkflowType() *WorkflowType { - return e.params.WorkflowType -} - -// Args return workflow argument of the new run -func (e *ContinueAsNewError) Args() []interface{} { - return e.args -} - // newTerminatedError creates NewTerminatedError instance func newTerminatedError() *TerminatedError { return &TerminatedError{} diff --git a/internal/error_test.go b/internal/error_test.go index b5d2b21bd..a914a3354 100644 --- a/internal/error_test.go +++ b/internal/error_test.go @@ -492,7 +492,8 @@ func Test_ContinueAsNewError(t *testing.T) { return NewContinueAsNewError(ctx, continueAsNewWfName, a1, a2) } - headerValue, err := converter.GetDefaultDataConverter().ToPayload("test-data") + dataConverter := converter.GetDefaultDataConverter() + headerValue, err := dataConverter.ToPayload("test-data") assert.NoError(t, err) header := &commonpb.Header{ Fields: map[string]*commonpb.Payload{"test": headerValue}, @@ -516,16 +517,16 @@ func Test_ContinueAsNewError(t *testing.T) { err = errors.Unwrap(workflowErr) var continueAsNewErr *ContinueAsNewError require.True(t, errors.As(err, &continueAsNewErr)) - require.Equal(t, continueAsNewWfName, continueAsNewErr.WorkflowType().Name) + require.Equal(t, continueAsNewWfName, continueAsNewErr.WorkflowType.Name) - args := continueAsNewErr.Args() - intArg, ok := args[0].(int) - require.True(t, ok) + input := continueAsNewErr.Input + var intArg int + var stringArg string + err = dataConverter.FromPayloads(input, &intArg, &stringArg) + require.NoError(t, err) require.Equal(t, a1, intArg) - stringArg, ok := args[1].(string) - require.True(t, ok) require.Equal(t, a2, stringArg) - require.Equal(t, header, continueAsNewErr.params.Header) + require.Equal(t, header, continueAsNewErr.Header) } type coolError struct{} diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 556e9e9dd..dc0e97b2e 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1463,12 +1463,12 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow( metricsScope.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1) closeCommand = createNewCommand(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION) closeCommand.Attributes = &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ - WorkflowType: &commonpb.WorkflowType{Name: contErr.params.WorkflowType.Name}, - Input: contErr.params.Input, - TaskQueue: &taskqueuepb.TaskQueue{Name: contErr.params.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, - WorkflowRunTimeout: &contErr.params.WorkflowRunTimeout, - WorkflowTaskTimeout: &contErr.params.WorkflowTaskTimeout, - Header: contErr.params.Header, + WorkflowType: &commonpb.WorkflowType{Name: contErr.WorkflowType.Name}, + Input: contErr.Input, + TaskQueue: &taskqueuepb.TaskQueue{Name: contErr.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + WorkflowRunTimeout: &contErr.WorkflowRunTimeout, + WorkflowTaskTimeout: &contErr.WorkflowTaskTimeout, + Header: contErr.Header, Memo: workflowContext.workflowInfo.Memo, SearchAttributes: workflowContext.workflowInfo.SearchAttributes, }} diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index be47afcad..404c96c75 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -445,11 +445,11 @@ func (s *WorkflowUnitTest) Test_ContinueAsNewWorkflow() { err = errors.Unwrap(workflowErr) var resultErr *ContinueAsNewError s.True(errors.As(err, &resultErr)) - s.EqualValues("continueAsNewWorkflowTest", resultErr.params.WorkflowType.Name) - s.EqualValues(100*time.Second, resultErr.params.WorkflowExecutionTimeout) - s.EqualValues(50*time.Second, resultErr.params.WorkflowRunTimeout) - s.EqualValues(5*time.Second, resultErr.params.WorkflowTaskTimeout) - s.EqualValues("default-test-taskqueue", resultErr.params.TaskQueueName) + s.EqualValues("continueAsNewWorkflowTest", resultErr.WorkflowType.Name) + s.EqualValues(100*time.Second, resultErr.WorkflowExecutionTimeout) + s.EqualValues(50*time.Second, resultErr.WorkflowRunTimeout) + s.EqualValues(5*time.Second, resultErr.WorkflowTaskTimeout) + s.EqualValues("default-test-taskqueue", resultErr.TaskQueueName) } func cancelWorkflowTest(ctx Context) (string, error) { diff --git a/internalbindings/internalbindings.go b/internalbindings/internalbindings.go index 33c1708b6..83ec59df2 100644 --- a/internalbindings/internalbindings.go +++ b/internalbindings/internalbindings.go @@ -70,4 +70,6 @@ type ( ResultHandler = internal.ResultHandler // TimerID uniquely identifies timer TimerID = internal.TimerID + // ContinueAsNewError used by a workflow to request continue as new + ContinueAsNewError = internal.ContinueAsNewError )