diff --git a/internal/client.go b/internal/client.go index 508b970a5..c1a3f9833 100644 --- a/internal/client.go +++ b/internal/client.go @@ -551,13 +551,13 @@ const ( // NewClient creates an instance of a workflow client func NewClient(options ClientOptions) (Client, error) { - if len(options.Namespace) == 0 { + if options.Namespace == "" { options.Namespace = DefaultNamespace } options.MetricsScope = tagScope(options.MetricsScope, tagNamespace, options.Namespace, clientImplHeaderName, clientImplHeaderValue) - if len(options.HostPort) == 0 { + if options.HostPort == "" { options.HostPort = LocalHostPort } @@ -581,11 +581,11 @@ func NewClient(options ClientOptions) (Client, error) { // NewServiceClient creates workflow client from workflowservice.WorkflowServiceClient. Must be used internally in unit tests only. func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClient, connectionCloser io.Closer, options ClientOptions) *WorkflowClient { // Namespace can be empty in unit tests. - if len(options.Namespace) == 0 { + if options.Namespace == "" { options.Namespace = DefaultNamespace } - if len(options.Identity) == 0 { + if options.Identity == "" { options.Identity = getWorkerIdentity("") } @@ -616,7 +616,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) { options.MetricsScope = tagScope(options.MetricsScope, clientImplHeaderName, clientImplHeaderValue) - if len(options.HostPort) == 0 { + if options.HostPort == "" { options.HostPort = LocalHostPort } @@ -638,7 +638,7 @@ func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) { } func newNamespaceServiceClient(workflowServiceClient workflowservice.WorkflowServiceClient, clientConn *grpc.ClientConn, options ClientOptions) NamespaceClient { - if len(options.Identity) == 0 { + if options.Identity == "" { options.Identity = getWorkerIdentity("") } diff --git a/internal/internal_activity.go b/internal/internal_activity.go index cf6fde3c2..c4598d832 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -170,39 +170,6 @@ func getLocalActivityOptions(ctx Context) *localActivityOptions { return opts.(*localActivityOptions) } -func getValidatedActivityOptions(ctx Context) (*activityOptions, error) { - p := getActivityOptions(ctx) - if p == nil { - // We need task list as a compulsory parameter. This can be removed after registration - return nil, errActivityParamsBadRequest - } - if p.TaskListName == "" { - // We default to origin task list name. - p.TaskListName = p.OriginalTaskListName - } - if p.ScheduleToStartTimeoutSeconds <= 0 { - return nil, errors.New("missing or negative ScheduleToStartTimeoutSeconds") - } - if p.StartToCloseTimeoutSeconds <= 0 { - return nil, errors.New("missing or negative StartToCloseTimeoutSeconds") - } - if p.ScheduleToCloseTimeoutSeconds < 0 { - return nil, errors.New("missing or negative ScheduleToCloseTimeoutSeconds") - } - if p.ScheduleToCloseTimeoutSeconds == 0 { - // This is a optional parameter, we default to sum of the other two timeouts. - p.ScheduleToCloseTimeoutSeconds = p.ScheduleToStartTimeoutSeconds + p.StartToCloseTimeoutSeconds - } - if p.HeartbeatTimeoutSeconds < 0 { - return nil, errors.New("invalid negative HeartbeatTimeoutSeconds") - } - if err := validateRetryPolicy(p.RetryPolicy); err != nil { - return nil, err - } - - return p, nil -} - func getValidatedLocalActivityOptions(ctx Context) (*localActivityOptions, error) { p := getLocalActivityOptions(ctx) if p == nil { @@ -215,37 +182,6 @@ func getValidatedLocalActivityOptions(ctx Context) (*localActivityOptions, error return p, nil } -func validateRetryPolicy(p *commonpb.RetryPolicy) error { - if p == nil { - return nil - } - - if p.GetInitialIntervalInSeconds() <= 0 { - return errors.New("missing or negative InitialIntervalInSeconds on retry policy") - } - if p.GetMaximumIntervalInSeconds() < 0 { - return errors.New("negative MaximumIntervalInSeconds on retry policy is invalid") - } - if p.GetMaximumIntervalInSeconds() == 0 { - // if not set, default to 100x of initial interval - p.MaximumIntervalInSeconds = 100 * p.GetInitialIntervalInSeconds() - } - if p.GetMaximumAttempts() < 0 { - return errors.New("negative MaximumAttempts on retry policy is invalid") - } - if p.GetExpirationIntervalInSeconds() < 0 { - return errors.New("ExpirationIntervalInSeconds cannot be less than 0 on retry policy") - } - if p.GetBackoffCoefficient() < 1 { - return errors.New("BackoffCoefficient on retry policy cannot be less than 1.0") - } - if p.GetMaximumAttempts() == 0 && p.GetExpirationIntervalInSeconds() == 0 { - return errors.New("both MaximumAttempts and ExpirationIntervalInSeconds on retry policy are not set, at least one of them must be set") - } - - return nil -} - func validateFunctionArgs(f interface{}, args []interface{}, isWorkflow bool) error { fType := reflect.TypeOf(f) if fType == nil || fType.Kind() != reflect.Func { diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 1465a1868..e1ff9bfa3 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -1256,17 +1256,15 @@ func TestChainedFuture(t *testing.T) { activityFn := func(arg int) (int, error) { return arg, nil } - workflowFn := func(ctx Context) (int, error) { + workflowFn := func(ctx Context) (out int, err error) { ctx = WithActivityOptions(ctx, ActivityOptions{ - ScheduleToStartTimeout: time.Minute, - StartToCloseTimeout: time.Minute, + ScheduleToCloseTimeout: time.Minute, }) f := ExecuteActivity(ctx, activityFn, 5) - var out int fut, set := NewFuture(ctx) set.Chain(f) - require.NoError(t, fut.Get(ctx, &out)) - return out, nil + err = fut.Get(ctx, &out) + return } s := WorkflowTestSuite{} diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 44afac042..5e89ccc56 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -598,8 +598,8 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice. return nil, errors.New("first history event is not WorkflowExecutionStarted") } taskList := attributes.TaskList - if taskList == nil { - return nil, errors.New("nil TaskList in WorkflowExecutionStarted event") + if taskList == nil || taskList.Name == "" { + return nil, errors.New("nil or empty TaskList in WorkflowExecutionStarted event") } runID := task.WorkflowExecution.GetRunId() diff --git a/internal/internal_worker.go b/internal/internal_worker.go index ed43657a3..6a257baf8 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -257,7 +257,7 @@ func verifyNamespaceExist(client workflowservice.WorkflowServiceClient, namespac return nil } - if len(namespace) == 0 { + if namespace == "" { return errors.New("namespace cannot be empty") } @@ -1536,7 +1536,7 @@ func setClientDefaults(client *WorkflowClient) { if client.dataConverter == nil { client.dataConverter = getDefaultDataConverter() } - if len(client.namespace) == 0 { + if client.namespace == "" { client.namespace = DefaultNamespace } if client.tracer == nil { diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 508159bc2..406a8705f 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -36,7 +36,6 @@ import ( "time" "unicode" - "github.com/robfig/cron" "go.uber.org/atomic" "go.uber.org/zap" @@ -1150,49 +1149,6 @@ func getValidatedWorkflowFunction(workflowFunc interface{}, args []interface{}, return &WorkflowType{Name: fnName}, input, nil } -func getValidatedWorkflowOptions(ctx Context) (*workflowOptions, error) { - p := getWorkflowEnvOptions(ctx) - if p == nil { - // We need task list as a compulsory parameter. This can be removed after registration - return nil, errWorkflowOptionBadRequest - } - info := GetWorkflowInfo(ctx) - if p.namespace == "" { - // default to use current workflow's namespace - p.namespace = info.Namespace - } - if p.taskListName == "" { - // default to use current workflow's task list - p.taskListName = info.TaskListName - } - if p.taskStartToCloseTimeoutSeconds < 0 { - return nil, errors.New("missing or negative DecisionTaskStartToCloseTimeout") - } - if p.taskStartToCloseTimeoutSeconds == 0 { - p.taskStartToCloseTimeoutSeconds = defaultDecisionTaskTimeoutInSecs - } - if p.executionStartToCloseTimeoutSeconds <= 0 { - return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") - } - if err := validateRetryPolicy(p.retryPolicy); err != nil { - return nil, err - } - if err := validateCronSchedule(p.cronSchedule); err != nil { - return nil, err - } - - return p, nil -} - -func validateCronSchedule(cronSchedule string) error { - if len(cronSchedule) == 0 { - return nil - } - - _, err := cron.ParseStandard(cronSchedule) - return err -} - func getWorkflowEnvOptions(ctx Context) *workflowOptions { options := ctx.Value(workflowEnvOptionsContextKey) if options != nil { diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 0c163583e..423992c47 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -56,8 +56,7 @@ var _ Client = (*WorkflowClient)(nil) var _ NamespaceClient = (*namespaceClient)(nil) const ( - defaultDecisionTaskTimeoutInSecs = 10 - defaultGetHistoryTimeoutInSecs = 25 + defaultGetHistoryTimeoutInSecs = 65 ) var ( @@ -167,22 +166,8 @@ func (wc *WorkflowClient) StartWorkflow( workflowID = uuid.NewRandom().String() } - if options.TaskList == "" { - return nil, errors.New("missing TaskList") - } - executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) - if executionTimeout <= 0 { - return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") - } - decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) - if decisionTaskTimeout < 0 { - return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") - } - if decisionTaskTimeout == 0 { - decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs - } // Validate type and its arguments. workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, args, wc.dataConverter, wc.registry) @@ -364,22 +349,8 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI workflowID = uuid.NewRandom().String() } - if options.TaskList == "" { - return nil, errors.New("missing TaskList") - } - executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) - if executionTimeout <= 0 { - return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") - } - decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) - if decisionTaskTimeout < 0 { - return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") - } - if decisionTaskTimeout == 0 { - decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs - } // Validate type and its arguments. workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, workflowArgs, wc.dataConverter, wc.registry) @@ -620,7 +591,7 @@ func (wc *WorkflowClient) RecordActivityHeartbeatByID(ctx context.Context, // - InternalServiceError // - EntityNotExistError func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workflowservice.ListClosedWorkflowExecutionsRequest) (*workflowservice.ListClosedWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.ListClosedWorkflowExecutionsResponse @@ -644,7 +615,7 @@ func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workf // - InternalServiceError // - EntityNotExistError func (wc *WorkflowClient) ListOpenWorkflow(ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest) (*workflowservice.ListOpenWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.ListOpenWorkflowExecutionsResponse @@ -664,7 +635,7 @@ func (wc *WorkflowClient) ListOpenWorkflow(ctx context.Context, request *workflo // ListWorkflow implementation func (wc *WorkflowClient) ListWorkflow(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*workflowservice.ListWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.ListWorkflowExecutionsResponse @@ -684,7 +655,7 @@ func (wc *WorkflowClient) ListWorkflow(ctx context.Context, request *workflowser // ListArchivedWorkflow implementation func (wc *WorkflowClient) ListArchivedWorkflow(ctx context.Context, request *workflowservice.ListArchivedWorkflowExecutionsRequest) (*workflowservice.ListArchivedWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.ListArchivedWorkflowExecutionsResponse @@ -716,7 +687,7 @@ func (wc *WorkflowClient) ListArchivedWorkflow(ctx context.Context, request *wor // ScanWorkflow implementation func (wc *WorkflowClient) ScanWorkflow(ctx context.Context, request *workflowservice.ScanWorkflowExecutionsRequest) (*workflowservice.ScanWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.ScanWorkflowExecutionsResponse @@ -736,7 +707,7 @@ func (wc *WorkflowClient) ScanWorkflow(ctx context.Context, request *workflowser // CountWorkflow implementation func (wc *WorkflowClient) CountWorkflow(ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest) (*workflowservice.CountWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.CountWorkflowExecutionsResponse diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index b60b004bd..31ebe35ec 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -27,7 +27,6 @@ package internal import ( "context" "encoding/json" - "errors" "fmt" "log" "os" @@ -232,7 +231,7 @@ func (s *historyEventIteratorSuite) TestIterator_NoError_EmptyPage() { s.Equal(1, len(events)) } -func (s *historyEventIteratorSuite) TestIterator_Error() { +func (s *historyEventIteratorSuite) TestIteratorError() { filterType := filterpb.HistoryEventFilterType_AllEvent request1 := getGetWorkflowExecutionHistoryRequest(filterType) response1 := &workflowservice.GetWorkflowExecutionHistoryResponse{ @@ -804,33 +803,6 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow() { s.Equal(createResponse.GetRunId(), resp.RunID) } -func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_Error() { - signalName := "my signal" - signalInput := []byte("my signal input") - options := StartWorkflowOptions{} - - resp, err := s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, - options, workflowType) - s.Equal(errors.New("missing TaskList"), err) - s.Nil(resp) - - options.TaskList = tasklist - resp, err = s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, - options, workflowType) - s.NotNil(err) - s.Nil(resp) - - options.ExecutionStartToCloseTimeout = timeoutInSeconds - createResponse := &workflowservice.SignalWithStartWorkflowExecutionResponse{ - RunId: runID, - } - s.service.EXPECT().SignalWithStartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil) - resp, err = s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, - options, workflowType) - s.Nil(err) - s.Equal(createResponse.GetRunId(), resp.RunID) -} - func (s *workflowClientTestSuite) TestStartWorkflow() { client, ok := s.client.(*WorkflowClient) s.True(ok) diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index e6134f920..87c453f4c 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -415,13 +415,17 @@ func continueAsNewWorkflowTest(ctx Context) error { func (s *WorkflowUnitTest) Test_ContinueAsNewWorkflow() { env := s.NewTestWorkflowEnvironment() + env.SetStartWorkflowOptions(StartWorkflowOptions{ + ExecutionStartToCloseTimeout: 100 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + }) env.ExecuteWorkflow(continueAsNewWorkflowTest) s.True(env.IsWorkflowCompleted()) s.NotNil(env.GetWorkflowError()) resultErr := env.GetWorkflowError().(*ContinueAsNewError) s.EqualValues("continueAsNewWorkflowTest", resultErr.params.workflowType.Name) - s.EqualValues(1, resultErr.params.executionStartToCloseTimeoutSeconds) - s.EqualValues(1, resultErr.params.taskStartToCloseTimeoutSeconds) + s.EqualValues(100, resultErr.params.executionStartToCloseTimeoutSeconds) + s.EqualValues(5, resultErr.params.taskStartToCloseTimeoutSeconds) s.EqualValues("default-test-tasklist", resultErr.params.taskListName) } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index f80ba1683..629ecf0d3 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -28,6 +28,8 @@ import ( "context" "errors" "fmt" + decisionpb "go.temporal.io/temporal-proto/decision" + tasklistpb "go.temporal.io/temporal-proto/tasklist" "reflect" "strings" "sync" @@ -60,6 +62,10 @@ const ( defaultTestRunID = "default-test-run-id" defaultTestWorkflowTypeName = "default-test-workflow-type-name" workflowTypeNotSpecified = "workflow-type-not-specified" + + // These are copied from service implementation + reservedTaskListPrefix = "/__temporal_sys/" + maxIDLengthLimit = 1000 ) type ( @@ -238,7 +244,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist WorkflowType: WorkflowType{Name: workflowTypeNotSpecified}, TaskListName: defaultTestTaskList, - ExecutionStartToCloseTimeoutSeconds: 1, + ExecutionStartToCloseTimeoutSeconds: 60 * 24 * 365 * 10, TaskStartToCloseTimeoutSeconds: 1, }, registry: r, @@ -332,6 +338,10 @@ func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(param childEnv.dataConverter = params.dataConverter childEnv.registry = env.registry + if params.taskListName == "" { + return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Empty task list name", "", "") + } + if params.workflowID == "" { params.workflowID = env.workflowInfo.WorkflowExecution.RunID + "_" + getStringID(env.nextID()) } @@ -492,7 +502,7 @@ func (env *testWorkflowEnvironmentImpl) executeActivity( panic(err) } - params := executeActivityParams{ + parameters := executeActivityParams{ activityOptions: activityOptions{ ScheduleToCloseTimeoutSeconds: 600, StartToCloseTimeoutSeconds: 600, @@ -502,13 +512,28 @@ func (env *testWorkflowEnvironmentImpl) executeActivity( Header: env.header, } + scheduleTaskAttr := &decisionpb.ScheduleActivityTaskDecisionAttributes{} + if parameters.ActivityID == "" { + scheduleTaskAttr.ActivityId = getStringID(env.nextID()) + } else { + scheduleTaskAttr.ActivityId = parameters.ActivityID + } + scheduleTaskAttr.ActivityType = &commonpb.ActivityType{Name: parameters.ActivityType.Name} + scheduleTaskAttr.TaskList = &tasklistpb.TaskList{Name: parameters.TaskListName} + scheduleTaskAttr.Input = parameters.Input + scheduleTaskAttr.ScheduleToCloseTimeoutSeconds = parameters.ScheduleToCloseTimeoutSeconds + scheduleTaskAttr.StartToCloseTimeoutSeconds = parameters.StartToCloseTimeoutSeconds + scheduleTaskAttr.ScheduleToStartTimeoutSeconds = parameters.ScheduleToStartTimeoutSeconds + scheduleTaskAttr.HeartbeatTimeoutSeconds = parameters.HeartbeatTimeoutSeconds + scheduleTaskAttr.RetryPolicy = parameters.RetryPolicy + scheduleTaskAttr.Header = parameters.Header + task := newTestActivityTask( defaultTestWorkflowID, defaultTestRunID, "0", - defaultTestWorkflowTypeName, defaultTestNamespace, - params, + scheduleTaskAttr, ) task.HeartbeatDetails = env.heartbeatDetails @@ -911,20 +936,34 @@ func (env *testWorkflowEnvironmentImpl) GetContextPropagators() []ContextPropaga } func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters executeActivityParams, callback resultHandler) *activityInfo { - var activityID string + scheduleTaskAttr := &decisionpb.ScheduleActivityTaskDecisionAttributes{} if parameters.ActivityID == "" { - activityID = getStringID(env.nextID()) + scheduleTaskAttr.ActivityId = getStringID(env.nextID()) } else { - activityID = parameters.ActivityID - } + scheduleTaskAttr.ActivityId = parameters.ActivityID + } + activityID := scheduleTaskAttr.GetActivityId() + scheduleTaskAttr.ActivityType = &commonpb.ActivityType{Name: parameters.ActivityType.Name} + scheduleTaskAttr.TaskList = &tasklistpb.TaskList{Name: parameters.TaskListName} + scheduleTaskAttr.Input = parameters.Input + scheduleTaskAttr.ScheduleToCloseTimeoutSeconds = parameters.ScheduleToCloseTimeoutSeconds + scheduleTaskAttr.StartToCloseTimeoutSeconds = parameters.StartToCloseTimeoutSeconds + scheduleTaskAttr.ScheduleToStartTimeoutSeconds = parameters.ScheduleToStartTimeoutSeconds + scheduleTaskAttr.HeartbeatTimeoutSeconds = parameters.HeartbeatTimeoutSeconds + scheduleTaskAttr.RetryPolicy = parameters.RetryPolicy + scheduleTaskAttr.Header = parameters.Header + err := env.validateActivityScheduleAttributes(scheduleTaskAttr, env.WorkflowInfo().ExecutionStartToCloseTimeoutSeconds) activityInfo := &activityInfo{activityID: activityID} + if err != nil { + callback(nil, err) + return activityInfo + } task := newTestActivityTask( defaultTestWorkflowID, defaultTestRunID, - activityInfo.activityID, defaultTestWorkflowTypeName, defaultTestNamespace, - parameters, + scheduleTaskAttr, ) taskHandler := env.newTestActivityTaskHandler(parameters.TaskListName, parameters.DataConverter) @@ -963,6 +1002,169 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters executeActivi return activityInfo } +// Copy of the server function func (v *decisionAttrValidator) validateActivityScheduleAttributes +func (env *testWorkflowEnvironmentImpl) validateActivityScheduleAttributes( + attributes *decisionpb.ScheduleActivityTaskDecisionAttributes, + wfTimeout int32, +) error { + + //if err := v.validateCrossNamespaceCall( + // namespaceID, + // targetNamespaceID, + //); err != nil { + // return err + //} + + if attributes == nil { + return serviceerror.NewInvalidArgument("ScheduleActivityTaskDecisionAttributes is not set on decision.") + } + + defaultTaskListName := "" + if _, err := env.validatedTaskList(attributes.TaskList, defaultTaskListName); err != nil { + return err + } + + if attributes.GetActivityId() == "" { + return serviceerror.NewInvalidArgument("ActivityId is not set on decision.") + } + + if attributes.ActivityType == nil || attributes.ActivityType.GetName() == "" { + return serviceerror.NewInvalidArgument("ActivityType is not set on decision.") + } + + if err := env.validateRetryPolicy(attributes.RetryPolicy); err != nil { + return err + } + + if len(attributes.GetActivityId()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgument("ActivityID exceeds length limit.") + } + + if len(attributes.GetActivityType().GetName()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgument("ActivityType exceeds length limit.") + } + + if len(attributes.GetNamespace()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgument("Namespace exceeds length limit.") + } + + // Only attempt to deduce and fill in unspecified timeouts only when all timeouts are non-negative. + if attributes.GetScheduleToCloseTimeoutSeconds() < 0 || attributes.GetScheduleToStartTimeoutSeconds() < 0 || + attributes.GetStartToCloseTimeoutSeconds() < 0 || attributes.GetHeartbeatTimeoutSeconds() < 0 { + return serviceerror.NewInvalidArgument("A valid timeout may not be negative.") + } + + // ensure activity timeout never larger than workflow timeout + if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout { + attributes.ScheduleToCloseTimeoutSeconds = wfTimeout + } + if attributes.GetScheduleToStartTimeoutSeconds() > wfTimeout { + attributes.ScheduleToStartTimeoutSeconds = wfTimeout + } + if attributes.GetStartToCloseTimeoutSeconds() > wfTimeout { + attributes.StartToCloseTimeoutSeconds = wfTimeout + } + if attributes.GetHeartbeatTimeoutSeconds() > wfTimeout { + attributes.HeartbeatTimeoutSeconds = wfTimeout + } + + validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0 + validScheduleToStart := attributes.GetScheduleToStartTimeoutSeconds() > 0 + validStartToClose := attributes.GetStartToCloseTimeoutSeconds() > 0 + + if validScheduleToClose { + if !validScheduleToStart { + attributes.ScheduleToStartTimeoutSeconds = attributes.GetScheduleToCloseTimeoutSeconds() + } + if !validStartToClose { + attributes.StartToCloseTimeoutSeconds = attributes.GetScheduleToCloseTimeoutSeconds() + } + } else if validScheduleToStart && validStartToClose { + attributes.ScheduleToCloseTimeoutSeconds = attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds() + if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout { + attributes.ScheduleToCloseTimeoutSeconds = wfTimeout + } + } else { + // Deduction failed as there's not enough information to fill in missing timeouts. + return serviceerror.NewInvalidArgument("A valid ScheduleToCloseTimeout is not set on decision.") + } + // ensure activity's SCHEDULE_TO_START and SCHEDULE_TO_CLOSE is as long as expiration on retry policy + p := attributes.RetryPolicy + if p != nil { + expiration := p.GetExpirationIntervalInSeconds() + if expiration == 0 { + expiration = wfTimeout + } + if attributes.GetScheduleToStartTimeoutSeconds() < expiration { + attributes.ScheduleToStartTimeoutSeconds = expiration + } + if attributes.GetScheduleToCloseTimeoutSeconds() < expiration { + attributes.ScheduleToCloseTimeoutSeconds = expiration + } + } + return nil +} + +// Copy of the service func (v *decisionAttrValidator) validatedTaskList +func (env *testWorkflowEnvironmentImpl) validatedTaskList( + taskList *tasklistpb.TaskList, + defaultVal string, +) (*tasklistpb.TaskList, error) { + + if taskList == nil { + taskList = &tasklistpb.TaskList{} + } + + if taskList.GetName() == "" { + if defaultVal == "" { + return taskList, serviceerror.NewInvalidArgument("missing task list name") + } + taskList.Name = defaultVal + return taskList, nil + } + + name := taskList.GetName() + if len(name) > maxIDLengthLimit { + return taskList, serviceerror.NewInvalidArgument(fmt.Sprintf("task list name exceeds length limit of %v", maxIDLengthLimit)) + } + + if strings.HasPrefix(name, reservedTaskListPrefix) { + return taskList, serviceerror.NewInvalidArgument(fmt.Sprintf("task list name cannot start with reserved prefix %v", reservedTaskListPrefix)) + } + + return taskList, nil +} + +// copy of the service func ValidateRetryPolicy(policy *commonpb.RetryPolicy) +func (env *testWorkflowEnvironmentImpl) validateRetryPolicy(policy *commonpb.RetryPolicy) error { + if policy == nil { + // nil policy is valid which means no retry + return nil + } + if policy.GetInitialIntervalInSeconds() <= 0 { + return serviceerror.NewInvalidArgument("InitialIntervalInSeconds must be greater than 0 on retry policy.") + } + if policy.GetBackoffCoefficient() < 1 { + return serviceerror.NewInvalidArgument("BackoffCoefficient cannot be less than 1 on retry policy.") + } + if policy.GetMaximumIntervalInSeconds() < 0 { + return serviceerror.NewInvalidArgument("MaximumIntervalInSeconds cannot be less than 0 on retry policy.") + } + if policy.GetMaximumIntervalInSeconds() > 0 && policy.GetMaximumIntervalInSeconds() < policy.GetInitialIntervalInSeconds() { + return serviceerror.NewInvalidArgument("MaximumIntervalInSeconds cannot be less than InitialIntervalInSeconds on retry policy.") + } + if policy.GetMaximumAttempts() < 0 { + return serviceerror.NewInvalidArgument("MaximumAttempts cannot be less than 0 on retry policy.") + } + if policy.GetExpirationIntervalInSeconds() < 0 { + return serviceerror.NewInvalidArgument("ExpirationIntervalInSeconds cannot be less than 0 on retry policy.") + } + if policy.GetMaximumAttempts() == 0 && policy.GetExpirationIntervalInSeconds() == 0 { + return serviceerror.NewInvalidArgument("MaximumAttempts and ExpirationIntervalInSeconds are both 0. At least one of them must be specified.") + } + return nil +} + func (env *testWorkflowEnvironmentImpl) getActivityHandle(activityID string) (*testActivityHandle, bool) { handle, ok := env.activities[env.makeUniqueID(activityID)] return handle, ok @@ -1559,7 +1761,9 @@ func (env *testWorkflowEnvironmentImpl) newTestActivityTaskHandler(taskList stri return taskHandler } -func newTestActivityTask(workflowID, runID, activityID, workflowTypeName, namespace string, params executeActivityParams) *workflowservice.PollForActivityTaskResponse { +func newTestActivityTask(workflowID, runID, workflowTypeName, namespace string, + attr *decisionpb.ScheduleActivityTaskDecisionAttributes) *workflowservice.PollForActivityTaskResponse { + activityID := attr.GetActivityId() task := &workflowservice.PollForActivityTaskResponse{ WorkflowExecution: &executionpb.WorkflowExecution{ WorkflowId: workflowID, @@ -1567,18 +1771,18 @@ func newTestActivityTask(workflowID, runID, activityID, workflowTypeName, namesp }, ActivityId: activityID, TaskToken: []byte(activityID), // use activityID as TaskToken so we can map TaskToken in heartbeat calls. - ActivityType: &commonpb.ActivityType{Name: params.ActivityType.Name}, - Input: params.Input, + ActivityType: &commonpb.ActivityType{Name: attr.GetActivityType().GetName()}, + Input: attr.GetInput(), ScheduledTimestamp: time.Now().UnixNano(), - ScheduleToCloseTimeoutSeconds: params.ScheduleToCloseTimeoutSeconds, + ScheduleToCloseTimeoutSeconds: attr.GetScheduleToCloseTimeoutSeconds(), StartedTimestamp: time.Now().UnixNano(), - StartToCloseTimeoutSeconds: params.StartToCloseTimeoutSeconds, - HeartbeatTimeoutSeconds: params.HeartbeatTimeoutSeconds, + StartToCloseTimeoutSeconds: attr.GetStartToCloseTimeoutSeconds(), + HeartbeatTimeoutSeconds: attr.GetHeartbeatTimeoutSeconds(), WorkflowType: &commonpb.WorkflowType{ Name: workflowTypeName, }, WorkflowNamespace: namespace, - Header: params.Header, + Header: attr.GetHeader(), } return task } @@ -1978,6 +2182,19 @@ func (env *testWorkflowEnvironmentImpl) GetRegistry() *registry { return env.registry } +func (env *testWorkflowEnvironmentImpl) setStartWorkflowOptions(options StartWorkflowOptions) { + wf := env.workflowInfo + if options.ExecutionStartToCloseTimeout > 0 { + wf.ExecutionStartToCloseTimeoutSeconds = common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) + } + if options.DecisionTaskStartToCloseTimeout > 0 { + wf.TaskStartToCloseTimeoutSeconds = common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) + } + if len(options.TaskList) > 0 { + wf.TaskListName = options.TaskList + } +} + func newTestSessionEnvironment(testWorkflowEnvironment *testWorkflowEnvironmentImpl, params *workerExecutionParameters, concurrentSessionExecutionSize int) *testSessionEnvironmentImpl { resourceID := params.SessionResourceID diff --git a/internal/workflow.go b/internal/workflow.go index d05bf1d8b..294aefdcd 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -40,11 +40,8 @@ import ( ) var ( - errNamespaceNotSet = errors.New("namespace is not set") errWorkflowIDNotSet = errors.New("workflowId is not set") errLocalActivityParamsBadRequest = errors.New("missing local activity parameters through context, check LocalActivityOptions") - errActivityParamsBadRequest = errors.New("missing activity parameters through context, check ActivityOptions") - errWorkflowOptionBadRequest = errors.New("missing workflow options through context, check WorkflowOptions") errSearchAttributesNotSet = errors.New("search attributes is empty") ) @@ -411,11 +408,6 @@ func (wc *workflowEnvironmentInterceptor) ExecuteActivity(ctx Context, typeName } // Validate context options. options := getActivityOptions(ctx) - options, err = getValidatedActivityOptions(ctx) - if err != nil { - settable.Set(nil, err) - return future - } // Validate session state. if sessionInfo := getSessionInfo(ctx); sessionInfo != nil { @@ -646,12 +638,8 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil mainSettable.Set(nil, err) return result } - options, err := getValidatedWorkflowOptions(ctx) - if err != nil { - executionSettable.Set(nil, err) - mainSettable.Set(nil, err) - return result - } + + options := getWorkflowEnvOptions(ctx) options.dataConverter = dc options.contextPropagators = workflowOptionsFromCtx.contextPropagators options.memo = workflowOptionsFromCtx.memo @@ -856,11 +844,6 @@ func (wc *workflowEnvironmentInterceptor) RequestCancelExternalWorkflow(ctx Cont options := getWorkflowEnvOptions(ctx1) future, settable := NewFuture(ctx1) - if options.namespace == "" { - settable.Set(nil, errNamespaceNotSet) - return future - } - if workflowID == "" { settable.Set(nil, errWorkflowIDNotSet) return future @@ -904,11 +887,6 @@ func signalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a options := getWorkflowEnvOptions(ctx1) future, settable := NewFuture(ctx1) - if options.namespace == "" { - settable.Set(nil, errNamespaceNotSet) - return future - } - if workflowID == "" { settable.Set(nil, errWorkflowIDNotSet) return future @@ -981,8 +959,12 @@ func (wc *workflowEnvironmentInterceptor) UpsertSearchAttributes(ctx Context, at func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context { ctx1 := setWorkflowEnvOptionsIfNotExist(ctx) wfOptions := getWorkflowEnvOptions(ctx1) - wfOptions.namespace = cwo.Namespace - wfOptions.taskListName = cwo.TaskList + if len(cwo.Namespace) > 0 { + wfOptions.namespace = cwo.Namespace + } + if len(cwo.TaskList) > 0 { + wfOptions.taskListName = cwo.TaskList + } wfOptions.workflowID = cwo.WorkflowID wfOptions.executionStartToCloseTimeoutSeconds = common.Int32Ceil(cwo.ExecutionStartToCloseTimeout.Seconds()) wfOptions.taskStartToCloseTimeoutSeconds = common.Int32Ceil(cwo.TaskStartToCloseTimeout.Seconds()) @@ -1006,6 +988,9 @@ func WithWorkflowNamespace(ctx Context, name string) Context { // WithWorkflowTaskList adds a task list to the context. func WithWorkflowTaskList(ctx Context, name string) Context { + if name == "" { + panic("empty task list name") + } ctx1 := setWorkflowEnvOptionsIfNotExist(ctx) getWorkflowEnvOptions(ctx1).taskListName = name return ctx1 @@ -1355,7 +1340,9 @@ func WithActivityOptions(ctx Context, options ActivityOptions) Context { ctx1 := setActivityParametersIfNotExist(ctx) eap := getActivityOptions(ctx1) - eap.TaskListName = options.TaskList + if len(options.TaskList) > 0 { + eap.TaskListName = options.TaskList + } eap.ScheduleToCloseTimeoutSeconds = common.Int32Ceil(options.ScheduleToCloseTimeout.Seconds()) eap.StartToCloseTimeoutSeconds = common.Int32Ceil(options.StartToCloseTimeout.Seconds()) eap.ScheduleToStartTimeoutSeconds = common.Int32Ceil(options.ScheduleToStartTimeout.Seconds()) diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 2251ce651..7878b9665 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -488,6 +488,13 @@ func (t *TestWorkflowEnvironment) SetWorkerOptions(options WorkerOptions) *TestW return t } +// SetStartWorkflowOptions sets StartWorkflowOptions used to specify workflow execution timeout and task list. +// Note that StartWorkflowOptions is defined in an internal package, use client.StartWorkflowOptions instead. +func (t *TestWorkflowEnvironment) SetStartWorkflowOptions(options StartWorkflowOptions) *TestWorkflowEnvironment { + t.impl.setStartWorkflowOptions(options) + return t +} + // SetDataConverter sets data converter. func (t *TestWorkflowEnvironment) SetDataConverter(dataConverter DataConverter) *TestWorkflowEnvironment { t.impl.setDataConverter(dataConverter)