From 9592ebc6a1f7ff5b3338a194ba14df8991247325 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 14 Apr 2020 09:20:12 -0700 Subject: [PATCH] Options validation removal (#102) Service already validates all the passed options. Validating them on the client leads to situations when workflows fail due to a bad option when they could be recovered. And even worse it leads to situations when validation logic on the client gets out of sync with the service. Also fixed TestWorkflowEnvironment to support mock testing of activities by their string name without any registration. Also removed initialization of a session worker in TestWorkflowEnvironment unless requested. --- internal/client.go | 12 +- internal/internal_activity.go | 64 ------ internal/internal_coroutines_test.go | 10 +- internal/internal_task_handlers.go | 4 +- internal/internal_worker.go | 4 +- internal/internal_workflow.go | 44 ---- internal/internal_workflow_client.go | 43 +--- internal/internal_workflow_client_test.go | 30 +-- internal/internal_workflow_test.go | 8 +- internal/internal_workflow_testsuite.go | 251 ++++++++++++++++++++-- internal/workflow.go | 41 ++-- internal/workflow_testsuite.go | 7 + 12 files changed, 283 insertions(+), 235 deletions(-) diff --git a/internal/client.go b/internal/client.go index 508b970a5..c1a3f9833 100644 --- a/internal/client.go +++ b/internal/client.go @@ -551,13 +551,13 @@ const ( // NewClient creates an instance of a workflow client func NewClient(options ClientOptions) (Client, error) { - if len(options.Namespace) == 0 { + if options.Namespace == "" { options.Namespace = DefaultNamespace } options.MetricsScope = tagScope(options.MetricsScope, tagNamespace, options.Namespace, clientImplHeaderName, clientImplHeaderValue) - if len(options.HostPort) == 0 { + if options.HostPort == "" { options.HostPort = LocalHostPort } @@ -581,11 +581,11 @@ func NewClient(options ClientOptions) (Client, error) { // NewServiceClient creates workflow client from workflowservice.WorkflowServiceClient. Must be used internally in unit tests only. func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClient, connectionCloser io.Closer, options ClientOptions) *WorkflowClient { // Namespace can be empty in unit tests. - if len(options.Namespace) == 0 { + if options.Namespace == "" { options.Namespace = DefaultNamespace } - if len(options.Identity) == 0 { + if options.Identity == "" { options.Identity = getWorkerIdentity("") } @@ -616,7 +616,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) { options.MetricsScope = tagScope(options.MetricsScope, clientImplHeaderName, clientImplHeaderValue) - if len(options.HostPort) == 0 { + if options.HostPort == "" { options.HostPort = LocalHostPort } @@ -638,7 +638,7 @@ func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) { } func newNamespaceServiceClient(workflowServiceClient workflowservice.WorkflowServiceClient, clientConn *grpc.ClientConn, options ClientOptions) NamespaceClient { - if len(options.Identity) == 0 { + if options.Identity == "" { options.Identity = getWorkerIdentity("") } diff --git a/internal/internal_activity.go b/internal/internal_activity.go index cf6fde3c2..c4598d832 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -170,39 +170,6 @@ func getLocalActivityOptions(ctx Context) *localActivityOptions { return opts.(*localActivityOptions) } -func getValidatedActivityOptions(ctx Context) (*activityOptions, error) { - p := getActivityOptions(ctx) - if p == nil { - // We need task list as a compulsory parameter. This can be removed after registration - return nil, errActivityParamsBadRequest - } - if p.TaskListName == "" { - // We default to origin task list name. - p.TaskListName = p.OriginalTaskListName - } - if p.ScheduleToStartTimeoutSeconds <= 0 { - return nil, errors.New("missing or negative ScheduleToStartTimeoutSeconds") - } - if p.StartToCloseTimeoutSeconds <= 0 { - return nil, errors.New("missing or negative StartToCloseTimeoutSeconds") - } - if p.ScheduleToCloseTimeoutSeconds < 0 { - return nil, errors.New("missing or negative ScheduleToCloseTimeoutSeconds") - } - if p.ScheduleToCloseTimeoutSeconds == 0 { - // This is a optional parameter, we default to sum of the other two timeouts. - p.ScheduleToCloseTimeoutSeconds = p.ScheduleToStartTimeoutSeconds + p.StartToCloseTimeoutSeconds - } - if p.HeartbeatTimeoutSeconds < 0 { - return nil, errors.New("invalid negative HeartbeatTimeoutSeconds") - } - if err := validateRetryPolicy(p.RetryPolicy); err != nil { - return nil, err - } - - return p, nil -} - func getValidatedLocalActivityOptions(ctx Context) (*localActivityOptions, error) { p := getLocalActivityOptions(ctx) if p == nil { @@ -215,37 +182,6 @@ func getValidatedLocalActivityOptions(ctx Context) (*localActivityOptions, error return p, nil } -func validateRetryPolicy(p *commonpb.RetryPolicy) error { - if p == nil { - return nil - } - - if p.GetInitialIntervalInSeconds() <= 0 { - return errors.New("missing or negative InitialIntervalInSeconds on retry policy") - } - if p.GetMaximumIntervalInSeconds() < 0 { - return errors.New("negative MaximumIntervalInSeconds on retry policy is invalid") - } - if p.GetMaximumIntervalInSeconds() == 0 { - // if not set, default to 100x of initial interval - p.MaximumIntervalInSeconds = 100 * p.GetInitialIntervalInSeconds() - } - if p.GetMaximumAttempts() < 0 { - return errors.New("negative MaximumAttempts on retry policy is invalid") - } - if p.GetExpirationIntervalInSeconds() < 0 { - return errors.New("ExpirationIntervalInSeconds cannot be less than 0 on retry policy") - } - if p.GetBackoffCoefficient() < 1 { - return errors.New("BackoffCoefficient on retry policy cannot be less than 1.0") - } - if p.GetMaximumAttempts() == 0 && p.GetExpirationIntervalInSeconds() == 0 { - return errors.New("both MaximumAttempts and ExpirationIntervalInSeconds on retry policy are not set, at least one of them must be set") - } - - return nil -} - func validateFunctionArgs(f interface{}, args []interface{}, isWorkflow bool) error { fType := reflect.TypeOf(f) if fType == nil || fType.Kind() != reflect.Func { diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 1465a1868..e1ff9bfa3 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -1256,17 +1256,15 @@ func TestChainedFuture(t *testing.T) { activityFn := func(arg int) (int, error) { return arg, nil } - workflowFn := func(ctx Context) (int, error) { + workflowFn := func(ctx Context) (out int, err error) { ctx = WithActivityOptions(ctx, ActivityOptions{ - ScheduleToStartTimeout: time.Minute, - StartToCloseTimeout: time.Minute, + ScheduleToCloseTimeout: time.Minute, }) f := ExecuteActivity(ctx, activityFn, 5) - var out int fut, set := NewFuture(ctx) set.Chain(f) - require.NoError(t, fut.Get(ctx, &out)) - return out, nil + err = fut.Get(ctx, &out) + return } s := WorkflowTestSuite{} diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 44afac042..5e89ccc56 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -598,8 +598,8 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice. return nil, errors.New("first history event is not WorkflowExecutionStarted") } taskList := attributes.TaskList - if taskList == nil { - return nil, errors.New("nil TaskList in WorkflowExecutionStarted event") + if taskList == nil || taskList.Name == "" { + return nil, errors.New("nil or empty TaskList in WorkflowExecutionStarted event") } runID := task.WorkflowExecution.GetRunId() diff --git a/internal/internal_worker.go b/internal/internal_worker.go index ed43657a3..6a257baf8 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -257,7 +257,7 @@ func verifyNamespaceExist(client workflowservice.WorkflowServiceClient, namespac return nil } - if len(namespace) == 0 { + if namespace == "" { return errors.New("namespace cannot be empty") } @@ -1536,7 +1536,7 @@ func setClientDefaults(client *WorkflowClient) { if client.dataConverter == nil { client.dataConverter = getDefaultDataConverter() } - if len(client.namespace) == 0 { + if client.namespace == "" { client.namespace = DefaultNamespace } if client.tracer == nil { diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 508159bc2..406a8705f 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -36,7 +36,6 @@ import ( "time" "unicode" - "github.com/robfig/cron" "go.uber.org/atomic" "go.uber.org/zap" @@ -1150,49 +1149,6 @@ func getValidatedWorkflowFunction(workflowFunc interface{}, args []interface{}, return &WorkflowType{Name: fnName}, input, nil } -func getValidatedWorkflowOptions(ctx Context) (*workflowOptions, error) { - p := getWorkflowEnvOptions(ctx) - if p == nil { - // We need task list as a compulsory parameter. This can be removed after registration - return nil, errWorkflowOptionBadRequest - } - info := GetWorkflowInfo(ctx) - if p.namespace == "" { - // default to use current workflow's namespace - p.namespace = info.Namespace - } - if p.taskListName == "" { - // default to use current workflow's task list - p.taskListName = info.TaskListName - } - if p.taskStartToCloseTimeoutSeconds < 0 { - return nil, errors.New("missing or negative DecisionTaskStartToCloseTimeout") - } - if p.taskStartToCloseTimeoutSeconds == 0 { - p.taskStartToCloseTimeoutSeconds = defaultDecisionTaskTimeoutInSecs - } - if p.executionStartToCloseTimeoutSeconds <= 0 { - return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") - } - if err := validateRetryPolicy(p.retryPolicy); err != nil { - return nil, err - } - if err := validateCronSchedule(p.cronSchedule); err != nil { - return nil, err - } - - return p, nil -} - -func validateCronSchedule(cronSchedule string) error { - if len(cronSchedule) == 0 { - return nil - } - - _, err := cron.ParseStandard(cronSchedule) - return err -} - func getWorkflowEnvOptions(ctx Context) *workflowOptions { options := ctx.Value(workflowEnvOptionsContextKey) if options != nil { diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 0c163583e..423992c47 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -56,8 +56,7 @@ var _ Client = (*WorkflowClient)(nil) var _ NamespaceClient = (*namespaceClient)(nil) const ( - defaultDecisionTaskTimeoutInSecs = 10 - defaultGetHistoryTimeoutInSecs = 25 + defaultGetHistoryTimeoutInSecs = 65 ) var ( @@ -167,22 +166,8 @@ func (wc *WorkflowClient) StartWorkflow( workflowID = uuid.NewRandom().String() } - if options.TaskList == "" { - return nil, errors.New("missing TaskList") - } - executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) - if executionTimeout <= 0 { - return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") - } - decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) - if decisionTaskTimeout < 0 { - return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") - } - if decisionTaskTimeout == 0 { - decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs - } // Validate type and its arguments. workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, args, wc.dataConverter, wc.registry) @@ -364,22 +349,8 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI workflowID = uuid.NewRandom().String() } - if options.TaskList == "" { - return nil, errors.New("missing TaskList") - } - executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) - if executionTimeout <= 0 { - return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout") - } - decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) - if decisionTaskTimeout < 0 { - return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided") - } - if decisionTaskTimeout == 0 { - decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs - } // Validate type and its arguments. workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, workflowArgs, wc.dataConverter, wc.registry) @@ -620,7 +591,7 @@ func (wc *WorkflowClient) RecordActivityHeartbeatByID(ctx context.Context, // - InternalServiceError // - EntityNotExistError func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workflowservice.ListClosedWorkflowExecutionsRequest) (*workflowservice.ListClosedWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.ListClosedWorkflowExecutionsResponse @@ -644,7 +615,7 @@ func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workf // - InternalServiceError // - EntityNotExistError func (wc *WorkflowClient) ListOpenWorkflow(ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest) (*workflowservice.ListOpenWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.ListOpenWorkflowExecutionsResponse @@ -664,7 +635,7 @@ func (wc *WorkflowClient) ListOpenWorkflow(ctx context.Context, request *workflo // ListWorkflow implementation func (wc *WorkflowClient) ListWorkflow(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*workflowservice.ListWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.ListWorkflowExecutionsResponse @@ -684,7 +655,7 @@ func (wc *WorkflowClient) ListWorkflow(ctx context.Context, request *workflowser // ListArchivedWorkflow implementation func (wc *WorkflowClient) ListArchivedWorkflow(ctx context.Context, request *workflowservice.ListArchivedWorkflowExecutionsRequest) (*workflowservice.ListArchivedWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.ListArchivedWorkflowExecutionsResponse @@ -716,7 +687,7 @@ func (wc *WorkflowClient) ListArchivedWorkflow(ctx context.Context, request *wor // ScanWorkflow implementation func (wc *WorkflowClient) ScanWorkflow(ctx context.Context, request *workflowservice.ScanWorkflowExecutionsRequest) (*workflowservice.ScanWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.ScanWorkflowExecutionsResponse @@ -736,7 +707,7 @@ func (wc *WorkflowClient) ScanWorkflow(ctx context.Context, request *workflowser // CountWorkflow implementation func (wc *WorkflowClient) CountWorkflow(ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest) (*workflowservice.CountWorkflowExecutionsResponse, error) { - if len(request.GetNamespace()) == 0 { + if request.GetNamespace() == "" { request.Namespace = wc.namespace } var response *workflowservice.CountWorkflowExecutionsResponse diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index b60b004bd..31ebe35ec 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -27,7 +27,6 @@ package internal import ( "context" "encoding/json" - "errors" "fmt" "log" "os" @@ -232,7 +231,7 @@ func (s *historyEventIteratorSuite) TestIterator_NoError_EmptyPage() { s.Equal(1, len(events)) } -func (s *historyEventIteratorSuite) TestIterator_Error() { +func (s *historyEventIteratorSuite) TestIteratorError() { filterType := filterpb.HistoryEventFilterType_AllEvent request1 := getGetWorkflowExecutionHistoryRequest(filterType) response1 := &workflowservice.GetWorkflowExecutionHistoryResponse{ @@ -804,33 +803,6 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow() { s.Equal(createResponse.GetRunId(), resp.RunID) } -func (s *workflowClientTestSuite) TestSignalWithStartWorkflow_Error() { - signalName := "my signal" - signalInput := []byte("my signal input") - options := StartWorkflowOptions{} - - resp, err := s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, - options, workflowType) - s.Equal(errors.New("missing TaskList"), err) - s.Nil(resp) - - options.TaskList = tasklist - resp, err = s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, - options, workflowType) - s.NotNil(err) - s.Nil(resp) - - options.ExecutionStartToCloseTimeout = timeoutInSeconds - createResponse := &workflowservice.SignalWithStartWorkflowExecutionResponse{ - RunId: runID, - } - s.service.EXPECT().SignalWithStartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil) - resp, err = s.client.SignalWithStartWorkflow(context.Background(), workflowID, signalName, signalInput, - options, workflowType) - s.Nil(err) - s.Equal(createResponse.GetRunId(), resp.RunID) -} - func (s *workflowClientTestSuite) TestStartWorkflow() { client, ok := s.client.(*WorkflowClient) s.True(ok) diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index e6134f920..87c453f4c 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -415,13 +415,17 @@ func continueAsNewWorkflowTest(ctx Context) error { func (s *WorkflowUnitTest) Test_ContinueAsNewWorkflow() { env := s.NewTestWorkflowEnvironment() + env.SetStartWorkflowOptions(StartWorkflowOptions{ + ExecutionStartToCloseTimeout: 100 * time.Second, + DecisionTaskStartToCloseTimeout: 5 * time.Second, + }) env.ExecuteWorkflow(continueAsNewWorkflowTest) s.True(env.IsWorkflowCompleted()) s.NotNil(env.GetWorkflowError()) resultErr := env.GetWorkflowError().(*ContinueAsNewError) s.EqualValues("continueAsNewWorkflowTest", resultErr.params.workflowType.Name) - s.EqualValues(1, resultErr.params.executionStartToCloseTimeoutSeconds) - s.EqualValues(1, resultErr.params.taskStartToCloseTimeoutSeconds) + s.EqualValues(100, resultErr.params.executionStartToCloseTimeoutSeconds) + s.EqualValues(5, resultErr.params.taskStartToCloseTimeoutSeconds) s.EqualValues("default-test-tasklist", resultErr.params.taskListName) } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index f80ba1683..629ecf0d3 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -28,6 +28,8 @@ import ( "context" "errors" "fmt" + decisionpb "go.temporal.io/temporal-proto/decision" + tasklistpb "go.temporal.io/temporal-proto/tasklist" "reflect" "strings" "sync" @@ -60,6 +62,10 @@ const ( defaultTestRunID = "default-test-run-id" defaultTestWorkflowTypeName = "default-test-workflow-type-name" workflowTypeNotSpecified = "workflow-type-not-specified" + + // These are copied from service implementation + reservedTaskListPrefix = "/__temporal_sys/" + maxIDLengthLimit = 1000 ) type ( @@ -238,7 +244,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist WorkflowType: WorkflowType{Name: workflowTypeNotSpecified}, TaskListName: defaultTestTaskList, - ExecutionStartToCloseTimeoutSeconds: 1, + ExecutionStartToCloseTimeoutSeconds: 60 * 24 * 365 * 10, TaskStartToCloseTimeoutSeconds: 1, }, registry: r, @@ -332,6 +338,10 @@ func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(param childEnv.dataConverter = params.dataConverter childEnv.registry = env.registry + if params.taskListName == "" { + return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Empty task list name", "", "") + } + if params.workflowID == "" { params.workflowID = env.workflowInfo.WorkflowExecution.RunID + "_" + getStringID(env.nextID()) } @@ -492,7 +502,7 @@ func (env *testWorkflowEnvironmentImpl) executeActivity( panic(err) } - params := executeActivityParams{ + parameters := executeActivityParams{ activityOptions: activityOptions{ ScheduleToCloseTimeoutSeconds: 600, StartToCloseTimeoutSeconds: 600, @@ -502,13 +512,28 @@ func (env *testWorkflowEnvironmentImpl) executeActivity( Header: env.header, } + scheduleTaskAttr := &decisionpb.ScheduleActivityTaskDecisionAttributes{} + if parameters.ActivityID == "" { + scheduleTaskAttr.ActivityId = getStringID(env.nextID()) + } else { + scheduleTaskAttr.ActivityId = parameters.ActivityID + } + scheduleTaskAttr.ActivityType = &commonpb.ActivityType{Name: parameters.ActivityType.Name} + scheduleTaskAttr.TaskList = &tasklistpb.TaskList{Name: parameters.TaskListName} + scheduleTaskAttr.Input = parameters.Input + scheduleTaskAttr.ScheduleToCloseTimeoutSeconds = parameters.ScheduleToCloseTimeoutSeconds + scheduleTaskAttr.StartToCloseTimeoutSeconds = parameters.StartToCloseTimeoutSeconds + scheduleTaskAttr.ScheduleToStartTimeoutSeconds = parameters.ScheduleToStartTimeoutSeconds + scheduleTaskAttr.HeartbeatTimeoutSeconds = parameters.HeartbeatTimeoutSeconds + scheduleTaskAttr.RetryPolicy = parameters.RetryPolicy + scheduleTaskAttr.Header = parameters.Header + task := newTestActivityTask( defaultTestWorkflowID, defaultTestRunID, "0", - defaultTestWorkflowTypeName, defaultTestNamespace, - params, + scheduleTaskAttr, ) task.HeartbeatDetails = env.heartbeatDetails @@ -911,20 +936,34 @@ func (env *testWorkflowEnvironmentImpl) GetContextPropagators() []ContextPropaga } func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters executeActivityParams, callback resultHandler) *activityInfo { - var activityID string + scheduleTaskAttr := &decisionpb.ScheduleActivityTaskDecisionAttributes{} if parameters.ActivityID == "" { - activityID = getStringID(env.nextID()) + scheduleTaskAttr.ActivityId = getStringID(env.nextID()) } else { - activityID = parameters.ActivityID - } + scheduleTaskAttr.ActivityId = parameters.ActivityID + } + activityID := scheduleTaskAttr.GetActivityId() + scheduleTaskAttr.ActivityType = &commonpb.ActivityType{Name: parameters.ActivityType.Name} + scheduleTaskAttr.TaskList = &tasklistpb.TaskList{Name: parameters.TaskListName} + scheduleTaskAttr.Input = parameters.Input + scheduleTaskAttr.ScheduleToCloseTimeoutSeconds = parameters.ScheduleToCloseTimeoutSeconds + scheduleTaskAttr.StartToCloseTimeoutSeconds = parameters.StartToCloseTimeoutSeconds + scheduleTaskAttr.ScheduleToStartTimeoutSeconds = parameters.ScheduleToStartTimeoutSeconds + scheduleTaskAttr.HeartbeatTimeoutSeconds = parameters.HeartbeatTimeoutSeconds + scheduleTaskAttr.RetryPolicy = parameters.RetryPolicy + scheduleTaskAttr.Header = parameters.Header + err := env.validateActivityScheduleAttributes(scheduleTaskAttr, env.WorkflowInfo().ExecutionStartToCloseTimeoutSeconds) activityInfo := &activityInfo{activityID: activityID} + if err != nil { + callback(nil, err) + return activityInfo + } task := newTestActivityTask( defaultTestWorkflowID, defaultTestRunID, - activityInfo.activityID, defaultTestWorkflowTypeName, defaultTestNamespace, - parameters, + scheduleTaskAttr, ) taskHandler := env.newTestActivityTaskHandler(parameters.TaskListName, parameters.DataConverter) @@ -963,6 +1002,169 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters executeActivi return activityInfo } +// Copy of the server function func (v *decisionAttrValidator) validateActivityScheduleAttributes +func (env *testWorkflowEnvironmentImpl) validateActivityScheduleAttributes( + attributes *decisionpb.ScheduleActivityTaskDecisionAttributes, + wfTimeout int32, +) error { + + //if err := v.validateCrossNamespaceCall( + // namespaceID, + // targetNamespaceID, + //); err != nil { + // return err + //} + + if attributes == nil { + return serviceerror.NewInvalidArgument("ScheduleActivityTaskDecisionAttributes is not set on decision.") + } + + defaultTaskListName := "" + if _, err := env.validatedTaskList(attributes.TaskList, defaultTaskListName); err != nil { + return err + } + + if attributes.GetActivityId() == "" { + return serviceerror.NewInvalidArgument("ActivityId is not set on decision.") + } + + if attributes.ActivityType == nil || attributes.ActivityType.GetName() == "" { + return serviceerror.NewInvalidArgument("ActivityType is not set on decision.") + } + + if err := env.validateRetryPolicy(attributes.RetryPolicy); err != nil { + return err + } + + if len(attributes.GetActivityId()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgument("ActivityID exceeds length limit.") + } + + if len(attributes.GetActivityType().GetName()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgument("ActivityType exceeds length limit.") + } + + if len(attributes.GetNamespace()) > maxIDLengthLimit { + return serviceerror.NewInvalidArgument("Namespace exceeds length limit.") + } + + // Only attempt to deduce and fill in unspecified timeouts only when all timeouts are non-negative. + if attributes.GetScheduleToCloseTimeoutSeconds() < 0 || attributes.GetScheduleToStartTimeoutSeconds() < 0 || + attributes.GetStartToCloseTimeoutSeconds() < 0 || attributes.GetHeartbeatTimeoutSeconds() < 0 { + return serviceerror.NewInvalidArgument("A valid timeout may not be negative.") + } + + // ensure activity timeout never larger than workflow timeout + if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout { + attributes.ScheduleToCloseTimeoutSeconds = wfTimeout + } + if attributes.GetScheduleToStartTimeoutSeconds() > wfTimeout { + attributes.ScheduleToStartTimeoutSeconds = wfTimeout + } + if attributes.GetStartToCloseTimeoutSeconds() > wfTimeout { + attributes.StartToCloseTimeoutSeconds = wfTimeout + } + if attributes.GetHeartbeatTimeoutSeconds() > wfTimeout { + attributes.HeartbeatTimeoutSeconds = wfTimeout + } + + validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0 + validScheduleToStart := attributes.GetScheduleToStartTimeoutSeconds() > 0 + validStartToClose := attributes.GetStartToCloseTimeoutSeconds() > 0 + + if validScheduleToClose { + if !validScheduleToStart { + attributes.ScheduleToStartTimeoutSeconds = attributes.GetScheduleToCloseTimeoutSeconds() + } + if !validStartToClose { + attributes.StartToCloseTimeoutSeconds = attributes.GetScheduleToCloseTimeoutSeconds() + } + } else if validScheduleToStart && validStartToClose { + attributes.ScheduleToCloseTimeoutSeconds = attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds() + if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout { + attributes.ScheduleToCloseTimeoutSeconds = wfTimeout + } + } else { + // Deduction failed as there's not enough information to fill in missing timeouts. + return serviceerror.NewInvalidArgument("A valid ScheduleToCloseTimeout is not set on decision.") + } + // ensure activity's SCHEDULE_TO_START and SCHEDULE_TO_CLOSE is as long as expiration on retry policy + p := attributes.RetryPolicy + if p != nil { + expiration := p.GetExpirationIntervalInSeconds() + if expiration == 0 { + expiration = wfTimeout + } + if attributes.GetScheduleToStartTimeoutSeconds() < expiration { + attributes.ScheduleToStartTimeoutSeconds = expiration + } + if attributes.GetScheduleToCloseTimeoutSeconds() < expiration { + attributes.ScheduleToCloseTimeoutSeconds = expiration + } + } + return nil +} + +// Copy of the service func (v *decisionAttrValidator) validatedTaskList +func (env *testWorkflowEnvironmentImpl) validatedTaskList( + taskList *tasklistpb.TaskList, + defaultVal string, +) (*tasklistpb.TaskList, error) { + + if taskList == nil { + taskList = &tasklistpb.TaskList{} + } + + if taskList.GetName() == "" { + if defaultVal == "" { + return taskList, serviceerror.NewInvalidArgument("missing task list name") + } + taskList.Name = defaultVal + return taskList, nil + } + + name := taskList.GetName() + if len(name) > maxIDLengthLimit { + return taskList, serviceerror.NewInvalidArgument(fmt.Sprintf("task list name exceeds length limit of %v", maxIDLengthLimit)) + } + + if strings.HasPrefix(name, reservedTaskListPrefix) { + return taskList, serviceerror.NewInvalidArgument(fmt.Sprintf("task list name cannot start with reserved prefix %v", reservedTaskListPrefix)) + } + + return taskList, nil +} + +// copy of the service func ValidateRetryPolicy(policy *commonpb.RetryPolicy) +func (env *testWorkflowEnvironmentImpl) validateRetryPolicy(policy *commonpb.RetryPolicy) error { + if policy == nil { + // nil policy is valid which means no retry + return nil + } + if policy.GetInitialIntervalInSeconds() <= 0 { + return serviceerror.NewInvalidArgument("InitialIntervalInSeconds must be greater than 0 on retry policy.") + } + if policy.GetBackoffCoefficient() < 1 { + return serviceerror.NewInvalidArgument("BackoffCoefficient cannot be less than 1 on retry policy.") + } + if policy.GetMaximumIntervalInSeconds() < 0 { + return serviceerror.NewInvalidArgument("MaximumIntervalInSeconds cannot be less than 0 on retry policy.") + } + if policy.GetMaximumIntervalInSeconds() > 0 && policy.GetMaximumIntervalInSeconds() < policy.GetInitialIntervalInSeconds() { + return serviceerror.NewInvalidArgument("MaximumIntervalInSeconds cannot be less than InitialIntervalInSeconds on retry policy.") + } + if policy.GetMaximumAttempts() < 0 { + return serviceerror.NewInvalidArgument("MaximumAttempts cannot be less than 0 on retry policy.") + } + if policy.GetExpirationIntervalInSeconds() < 0 { + return serviceerror.NewInvalidArgument("ExpirationIntervalInSeconds cannot be less than 0 on retry policy.") + } + if policy.GetMaximumAttempts() == 0 && policy.GetExpirationIntervalInSeconds() == 0 { + return serviceerror.NewInvalidArgument("MaximumAttempts and ExpirationIntervalInSeconds are both 0. At least one of them must be specified.") + } + return nil +} + func (env *testWorkflowEnvironmentImpl) getActivityHandle(activityID string) (*testActivityHandle, bool) { handle, ok := env.activities[env.makeUniqueID(activityID)] return handle, ok @@ -1559,7 +1761,9 @@ func (env *testWorkflowEnvironmentImpl) newTestActivityTaskHandler(taskList stri return taskHandler } -func newTestActivityTask(workflowID, runID, activityID, workflowTypeName, namespace string, params executeActivityParams) *workflowservice.PollForActivityTaskResponse { +func newTestActivityTask(workflowID, runID, workflowTypeName, namespace string, + attr *decisionpb.ScheduleActivityTaskDecisionAttributes) *workflowservice.PollForActivityTaskResponse { + activityID := attr.GetActivityId() task := &workflowservice.PollForActivityTaskResponse{ WorkflowExecution: &executionpb.WorkflowExecution{ WorkflowId: workflowID, @@ -1567,18 +1771,18 @@ func newTestActivityTask(workflowID, runID, activityID, workflowTypeName, namesp }, ActivityId: activityID, TaskToken: []byte(activityID), // use activityID as TaskToken so we can map TaskToken in heartbeat calls. - ActivityType: &commonpb.ActivityType{Name: params.ActivityType.Name}, - Input: params.Input, + ActivityType: &commonpb.ActivityType{Name: attr.GetActivityType().GetName()}, + Input: attr.GetInput(), ScheduledTimestamp: time.Now().UnixNano(), - ScheduleToCloseTimeoutSeconds: params.ScheduleToCloseTimeoutSeconds, + ScheduleToCloseTimeoutSeconds: attr.GetScheduleToCloseTimeoutSeconds(), StartedTimestamp: time.Now().UnixNano(), - StartToCloseTimeoutSeconds: params.StartToCloseTimeoutSeconds, - HeartbeatTimeoutSeconds: params.HeartbeatTimeoutSeconds, + StartToCloseTimeoutSeconds: attr.GetStartToCloseTimeoutSeconds(), + HeartbeatTimeoutSeconds: attr.GetHeartbeatTimeoutSeconds(), WorkflowType: &commonpb.WorkflowType{ Name: workflowTypeName, }, WorkflowNamespace: namespace, - Header: params.Header, + Header: attr.GetHeader(), } return task } @@ -1978,6 +2182,19 @@ func (env *testWorkflowEnvironmentImpl) GetRegistry() *registry { return env.registry } +func (env *testWorkflowEnvironmentImpl) setStartWorkflowOptions(options StartWorkflowOptions) { + wf := env.workflowInfo + if options.ExecutionStartToCloseTimeout > 0 { + wf.ExecutionStartToCloseTimeoutSeconds = common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) + } + if options.DecisionTaskStartToCloseTimeout > 0 { + wf.TaskStartToCloseTimeoutSeconds = common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) + } + if len(options.TaskList) > 0 { + wf.TaskListName = options.TaskList + } +} + func newTestSessionEnvironment(testWorkflowEnvironment *testWorkflowEnvironmentImpl, params *workerExecutionParameters, concurrentSessionExecutionSize int) *testSessionEnvironmentImpl { resourceID := params.SessionResourceID diff --git a/internal/workflow.go b/internal/workflow.go index d05bf1d8b..294aefdcd 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -40,11 +40,8 @@ import ( ) var ( - errNamespaceNotSet = errors.New("namespace is not set") errWorkflowIDNotSet = errors.New("workflowId is not set") errLocalActivityParamsBadRequest = errors.New("missing local activity parameters through context, check LocalActivityOptions") - errActivityParamsBadRequest = errors.New("missing activity parameters through context, check ActivityOptions") - errWorkflowOptionBadRequest = errors.New("missing workflow options through context, check WorkflowOptions") errSearchAttributesNotSet = errors.New("search attributes is empty") ) @@ -411,11 +408,6 @@ func (wc *workflowEnvironmentInterceptor) ExecuteActivity(ctx Context, typeName } // Validate context options. options := getActivityOptions(ctx) - options, err = getValidatedActivityOptions(ctx) - if err != nil { - settable.Set(nil, err) - return future - } // Validate session state. if sessionInfo := getSessionInfo(ctx); sessionInfo != nil { @@ -646,12 +638,8 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil mainSettable.Set(nil, err) return result } - options, err := getValidatedWorkflowOptions(ctx) - if err != nil { - executionSettable.Set(nil, err) - mainSettable.Set(nil, err) - return result - } + + options := getWorkflowEnvOptions(ctx) options.dataConverter = dc options.contextPropagators = workflowOptionsFromCtx.contextPropagators options.memo = workflowOptionsFromCtx.memo @@ -856,11 +844,6 @@ func (wc *workflowEnvironmentInterceptor) RequestCancelExternalWorkflow(ctx Cont options := getWorkflowEnvOptions(ctx1) future, settable := NewFuture(ctx1) - if options.namespace == "" { - settable.Set(nil, errNamespaceNotSet) - return future - } - if workflowID == "" { settable.Set(nil, errWorkflowIDNotSet) return future @@ -904,11 +887,6 @@ func signalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a options := getWorkflowEnvOptions(ctx1) future, settable := NewFuture(ctx1) - if options.namespace == "" { - settable.Set(nil, errNamespaceNotSet) - return future - } - if workflowID == "" { settable.Set(nil, errWorkflowIDNotSet) return future @@ -981,8 +959,12 @@ func (wc *workflowEnvironmentInterceptor) UpsertSearchAttributes(ctx Context, at func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context { ctx1 := setWorkflowEnvOptionsIfNotExist(ctx) wfOptions := getWorkflowEnvOptions(ctx1) - wfOptions.namespace = cwo.Namespace - wfOptions.taskListName = cwo.TaskList + if len(cwo.Namespace) > 0 { + wfOptions.namespace = cwo.Namespace + } + if len(cwo.TaskList) > 0 { + wfOptions.taskListName = cwo.TaskList + } wfOptions.workflowID = cwo.WorkflowID wfOptions.executionStartToCloseTimeoutSeconds = common.Int32Ceil(cwo.ExecutionStartToCloseTimeout.Seconds()) wfOptions.taskStartToCloseTimeoutSeconds = common.Int32Ceil(cwo.TaskStartToCloseTimeout.Seconds()) @@ -1006,6 +988,9 @@ func WithWorkflowNamespace(ctx Context, name string) Context { // WithWorkflowTaskList adds a task list to the context. func WithWorkflowTaskList(ctx Context, name string) Context { + if name == "" { + panic("empty task list name") + } ctx1 := setWorkflowEnvOptionsIfNotExist(ctx) getWorkflowEnvOptions(ctx1).taskListName = name return ctx1 @@ -1355,7 +1340,9 @@ func WithActivityOptions(ctx Context, options ActivityOptions) Context { ctx1 := setActivityParametersIfNotExist(ctx) eap := getActivityOptions(ctx1) - eap.TaskListName = options.TaskList + if len(options.TaskList) > 0 { + eap.TaskListName = options.TaskList + } eap.ScheduleToCloseTimeoutSeconds = common.Int32Ceil(options.ScheduleToCloseTimeout.Seconds()) eap.StartToCloseTimeoutSeconds = common.Int32Ceil(options.StartToCloseTimeout.Seconds()) eap.ScheduleToStartTimeoutSeconds = common.Int32Ceil(options.ScheduleToStartTimeout.Seconds()) diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 2251ce651..7878b9665 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -488,6 +488,13 @@ func (t *TestWorkflowEnvironment) SetWorkerOptions(options WorkerOptions) *TestW return t } +// SetStartWorkflowOptions sets StartWorkflowOptions used to specify workflow execution timeout and task list. +// Note that StartWorkflowOptions is defined in an internal package, use client.StartWorkflowOptions instead. +func (t *TestWorkflowEnvironment) SetStartWorkflowOptions(options StartWorkflowOptions) *TestWorkflowEnvironment { + t.impl.setStartWorkflowOptions(options) + return t +} + // SetDataConverter sets data converter. func (t *TestWorkflowEnvironment) SetDataConverter(dataConverter DataConverter) *TestWorkflowEnvironment { t.impl.setDataConverter(dataConverter)