diff --git a/go.mod b/go.mod index b270bda8a..0c2a176ea 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/uber-go/tally v3.3.15+incompatible github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible // indirect - go.temporal.io/temporal-proto v0.20.28 + go.temporal.io/temporal-proto v0.20.29 go.uber.org/atomic v1.6.0 go.uber.org/goleak v1.0.0 go.uber.org/zap v1.14.1 diff --git a/go.sum b/go.sum index 647633ce8..b71856fc4 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,8 @@ github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6 github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.temporal.io/temporal-proto v0.20.28 h1:syCkMj1bBEqPCj4dKmPQksBKH3qXeZ+PayfsBMdQEOY= go.temporal.io/temporal-proto v0.20.28/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= +go.temporal.io/temporal-proto v0.20.29 h1:YDKcU0qxThs9ihny93Pf2/gSdGvybKjKuCIosy54nQ8= +go.temporal.io/temporal-proto v0.20.29/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= diff --git a/internal/activity.go b/internal/activity.go index 678be267a..40be2c4d9 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -117,12 +117,16 @@ type ( // LocalActivityOptions stores local activity specific parameters that will be stored inside of a context. LocalActivityOptions struct { - // ScheduleToCloseTimeout - The end to end timeout for the local activity. + // ScheduleToCloseTimeout - The end to end timeout for the local activity including retries. // This field is required. ScheduleToCloseTimeout time.Duration + // StartToCloseTimeout - The timeout for a single execution of the local activity. + // Optional: defaults to ScheduleToClose + StartToCloseTimeout time.Duration + // RetryPolicy specify how to retry activity if error happens. - // Optional: default is no retry + // Optional: default is to retry according to the default retry policy up to ScheduleToCloseTimeout RetryPolicy *RetryPolicy } ) diff --git a/internal/client.go b/internal/client.go index c9ffe5491..b597f1985 100644 --- a/internal/client.go +++ b/internal/client.go @@ -404,16 +404,23 @@ type ( // Mandatory: No default. TaskList string - // ExecutionStartToCloseTimeout - The timeout for duration of workflow execution. + // WorkflowExecutionTimeout - The timeout for duration of workflow execution. + // It includes retries and continue as new. Use WorkflowRunTimeout to limit execution time + // of a single workflow run. // The resolution is seconds. - // Mandatory: No default. - ExecutionStartToCloseTimeout time.Duration + // Optional: defaulted to 10 years. + WorkflowExecutionTimeout time.Duration + + // WorkflowRunTimeout - The timeout for duration of a single workflow run. + // The resolution is seconds. + // Optional: defaulted to WorkflowExecutionTimeout. + WorkflowRunTimeout time.Duration - // DecisionTaskStartToCloseTimeout - The timeout for processing decision task from the time the worker + // WorkflowTaskTimeout - The timeout for processing workflow task from the time the worker // pulled this task. If a decision task is lost, it is retried after this timeout. // The resolution is seconds. // Optional: defaulted to 10 secs. - DecisionTaskStartToCloseTimeout time.Duration + WorkflowTaskTimeout time.Duration // WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful // for dedup logic if set to WorkflowIdReusePolicyRejectDuplicate. @@ -470,13 +477,8 @@ type ( // This value is the cap of the interval. Default is 100x of initial interval. MaximumInterval time.Duration - // Maximum time to retry. Either ExpirationInterval or MaximumAttempts is required. - // When exceeded the retries stop even if maximum retries is not reached yet. - ExpirationInterval time.Duration - // Maximum number of attempts. When exceeded the retries stop even if not expired yet. - // If not set or set to 0, it means unlimited, and rely on ExpirationInterval to stop. - // Either MaximumAttempts or ExpirationInterval is required. + // If not set or set to 0, it means unlimited, and rely on activity ScheduleToCloseTimeout to stop. MaximumAttempts int32 // Non-Retriable errors. This is optional. Temporal server will stop retry if error reason matches this list. diff --git a/internal/common/convert.go b/internal/common/convert.go index 3e80fa0cf..052b8e813 100644 --- a/internal/common/convert.go +++ b/internal/common/convert.go @@ -37,3 +37,43 @@ func Int32Ceil(v float64) int32 { func Int64Ceil(v float64) int64 { return int64(math.Ceil(v)) } + +// MinInt64 returns the smaller of two given int64 +func MinInt64(a, b int64) int64 { + if a < b { + return a + } + return b +} + +// MaxInt64 returns the greater of two given int64 +func MaxInt64(a, b int64) int64 { + if a > b { + return a + } + return b +} + +// MinInt32 return smaller one of two inputs int32 +func MinInt32(a, b int32) int32 { + if a < b { + return a + } + return b +} + +// MinInt returns the smaller of two given integers +func MinInt(a, b int) int { + if a < b { + return a + } + return b +} + +// MaxInt returns the greater one of two given integers +func MaxInt(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/internal/error.go b/internal/error.go index 23990d39d..08ac1d991 100644 --- a/internal/error.go +++ b/internal/error.go @@ -209,10 +209,10 @@ func IsCanceledError(err error) bool { // If the workflow main function returns this error then the current execution is ended and // the new execution with same workflow ID is started automatically with options // provided to this function. -// ctx - use context to override any options for the new workflow like execution timeout, decision task timeout, task list. +// ctx - use context to override any options for the new workflow like run timeout, task timeout, task list. // if not mentioned it would use the defaults that the current workflow is using. -// ctx := WithExecutionStartToCloseTimeout(ctx, 30 * time.Minute) -// ctx := WithWorkflowTaskStartToCloseTimeout(ctx, time.Minute) +// ctx := WithWorkflowRunTimeout(ctx, 30 * time.Minute) +// ctx := WithWorkflowTaskTimeout(ctx, 5 * time.Second) // ctx := WithWorkflowTaskList(ctx, "example-group") // wfn - workflow function. for new execution it can be different from the currently running. // args - arguments for the new workflow. @@ -228,15 +228,6 @@ func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *C if err != nil { panic(err) } - if options.taskListName == "" { - panic("invalid task list provided") - } - if options.executionStartToCloseTimeoutSeconds <= 0 { - panic("invalid executionStartToCloseTimeoutSeconds provided") - } - if options.taskStartToCloseTimeoutSeconds <= 0 { - panic("invalid taskStartToCloseTimeoutSeconds provided") - } params := &executeWorkflowParams{ workflowOptions: *options, diff --git a/internal/internal_activity.go b/internal/internal_activity.go index 2dd869953..dedd81d97 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -70,6 +70,7 @@ type ( localActivityOptions struct { ScheduleToCloseTimeoutSeconds int32 + StartToCloseTimeoutSeconds int32 RetryPolicy *RetryPolicy } @@ -175,10 +176,20 @@ func getValidatedLocalActivityOptions(ctx Context) (*localActivityOptions, error if p == nil { return nil, errLocalActivityParamsBadRequest } - if p.ScheduleToCloseTimeoutSeconds <= 0 { - return nil, errors.New("missing or negative ScheduleToCloseTimeoutSeconds") + if p.ScheduleToCloseTimeoutSeconds < 0 { + return nil, errors.New("negative ScheduleToCloseTimeoutSeconds") + } + if p.StartToCloseTimeoutSeconds < 0 { + return nil, errors.New("negative StartToCloseTimeoutSeconds") + } + if p.ScheduleToCloseTimeoutSeconds == 0 && p.StartToCloseTimeoutSeconds == 0 { + return nil, errors.New("at least one of ScheduleToCloseTimeoutSeconds and StartToCloseTimeoutSeconds is required") + } + if p.ScheduleToCloseTimeoutSeconds == 0 { + p.ScheduleToCloseTimeoutSeconds = p.StartToCloseTimeoutSeconds + } else { + p.StartToCloseTimeoutSeconds = p.ScheduleToCloseTimeoutSeconds } - return p, nil } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index a732d49e6..6580bdb51 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -371,8 +371,9 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow( attributes.Namespace = params.namespace attributes.TaskList = &tasklistpb.TaskList{Name: params.taskListName} attributes.WorkflowId = params.workflowID - attributes.ExecutionStartToCloseTimeoutSeconds = params.executionStartToCloseTimeoutSeconds - attributes.TaskStartToCloseTimeoutSeconds = params.taskStartToCloseTimeoutSeconds + attributes.WorkflowExecutionTimeoutSeconds = params.workflowExecutionTimeoutSeconds + attributes.WorkflowRunTimeoutSeconds = params.workflowRunTimeoutSeconds + attributes.WorkflowTaskTimeoutSeconds = params.workflowTaskTimeoutSeconds attributes.Input = params.input attributes.WorkflowType = &commonpb.WorkflowType{Name: params.workflowType.Name} attributes.WorkflowIdReusePolicy = params.workflowIDReusePolicy.toProto() @@ -506,8 +507,8 @@ func newLocalActivityTask(params executeLocalActivityParams, callback laResultHa attempt: params.Attempt, } - if params.RetryPolicy != nil && params.RetryPolicy.ExpirationInterval > 0 { - task.expireTime = params.ScheduledTime.Add(params.RetryPolicy.ExpirationInterval) + if params.ScheduleToCloseTimeoutSeconds > 0 { + task.expireTime = params.ScheduledTime.Add(time.Second * time.Duration(params.ScheduleToCloseTimeoutSeconds)) } return task } diff --git a/internal/internal_public.go b/internal/internal_public.go index be69f2e1a..ef5e67938 100644 --- a/internal/internal_public.go +++ b/internal/internal_public.go @@ -67,7 +67,7 @@ type ( // - RespondQueryTaskCompletedRequest // If waitLocalActivity is true, and there is outstanding local activities, this call will return nil. CompleteDecisionTask(workflowTask *workflowTask, waitLocalActivity bool) interface{} - // GetDecisionTimeout returns the TaskStartToCloseTimeout + // GetWorkflowTaskTimeout returns the WorkflowTaskTimeout GetDecisionTimeout() time.Duration GetCurrentDecisionTask() *workflowservice.PollForDecisionTaskResponse IsDestroyed() bool diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 2c2e9f719..031a9c1d5 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -617,19 +617,20 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice. ID: workflowID, RunID: runID, }, - WorkflowType: WorkflowType{Name: task.WorkflowType.GetName()}, - TaskListName: taskList.GetName(), - ExecutionStartToCloseTimeoutSeconds: attributes.GetExecutionStartToCloseTimeoutSeconds(), - TaskStartToCloseTimeoutSeconds: attributes.GetTaskStartToCloseTimeoutSeconds(), - Namespace: wth.namespace, - Attempt: attributes.GetAttempt(), - lastCompletionResult: attributes.LastCompletionResult, - CronSchedule: attributes.CronSchedule, - ContinuedExecutionRunID: attributes.ContinuedExecutionRunId, - ParentWorkflowNamespace: attributes.ParentWorkflowNamespace, - ParentWorkflowExecution: parentWorkflowExecution, - Memo: attributes.Memo, - SearchAttributes: attributes.SearchAttributes, + WorkflowType: WorkflowType{Name: task.WorkflowType.GetName()}, + TaskListName: taskList.GetName(), + WorkflowExecutionTimeoutSeconds: attributes.GetWorkflowExecutionTimeoutSeconds(), + WorkflowRunTimeoutSeconds: attributes.GetWorkflowRunTimeoutSeconds(), + WorkflowTaskTimeoutSeconds: attributes.GetWorkflowTaskTimeoutSeconds(), + Namespace: wth.namespace, + Attempt: attributes.GetAttempt(), + lastCompletionResult: attributes.LastCompletionResult, + CronSchedule: attributes.CronSchedule, + ContinuedExecutionRunID: attributes.ContinuedExecutionRunId, + ParentWorkflowNamespace: attributes.ParentWorkflowNamespace, + ParentWorkflowExecution: parentWorkflowExecution, + Memo: attributes.Memo, + SearchAttributes: attributes.SearchAttributes, } wfStartTime := time.Unix(0, h.Events[0].GetTimestamp()) @@ -770,7 +771,8 @@ processWorkflowLoop: if err == nil && response == nil { waitLocalActivityLoop: for { - deadlineToTrigger := time.Duration(float32(ratioToForceCompleteDecisionTaskComplete) * float32(workflowContext.GetDecisionTimeout())) + deadlineToTrigger := time.Duration(float32(ratioToForceCompleteDecisionTaskComplete) * + float32(workflowContext.GetWorkflowTaskTimeout())) delayDuration := time.Until(startTime.Add(deadlineToTrigger)) select { case <-time.After(delayDuration): @@ -963,7 +965,7 @@ func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResu } retryBackoff := getRetryBackoff(lar, time.Now(), w.wth.dataConverter) - if retryBackoff > 0 && retryBackoff <= w.GetDecisionTimeout() { + if retryBackoff > 0 && retryBackoff <= w.GetWorkflowTaskTimeout() { // we need a local retry time.AfterFunc(retryBackoff, func() { // TODO: this should not be a separate goroutine as it introduces race condition when accessing eventHandler. @@ -1014,10 +1016,6 @@ func getRetryBackoff(lar *localActivityResult, now time.Time, dataConverter Data } func getRetryBackoffWithNowTime(p *RetryPolicy, attempt int32, errReason string, now, expireTime time.Time) time.Duration { - if p.MaximumAttempts == 0 && p.ExpirationInterval == 0 { - return noRetryBackoff - } - if p.MaximumAttempts > 0 && attempt > p.MaximumAttempts-1 { return noRetryBackoff // max attempt reached } @@ -1140,8 +1138,8 @@ func (w *workflowExecutionContextImpl) ResetIfStale(task *workflowservice.PollFo return nil } -func (w *workflowExecutionContextImpl) GetDecisionTimeout() time.Duration { - return time.Second * time.Duration(w.workflowInfo.TaskStartToCloseTimeoutSeconds) +func (w *workflowExecutionContextImpl) GetWorkflowTaskTimeout() time.Duration { + return time.Second * time.Duration(w.workflowInfo.WorkflowTaskTimeoutSeconds) } func skipDeterministicCheckForDecision(d *decisionpb.Decision) bool { @@ -1490,14 +1488,14 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow( metricsScope.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1) closeDecision = createNewDecision(decisionpb.DecisionType_ContinueAsNewWorkflowExecution) closeDecision.Attributes = &decisionpb.Decision_ContinueAsNewWorkflowExecutionDecisionAttributes{ContinueAsNewWorkflowExecutionDecisionAttributes: &decisionpb.ContinueAsNewWorkflowExecutionDecisionAttributes{ - WorkflowType: &commonpb.WorkflowType{Name: contErr.params.workflowType.Name}, - Input: contErr.params.input, - TaskList: &tasklistpb.TaskList{Name: contErr.params.taskListName}, - ExecutionStartToCloseTimeoutSeconds: contErr.params.executionStartToCloseTimeoutSeconds, - TaskStartToCloseTimeoutSeconds: contErr.params.taskStartToCloseTimeoutSeconds, - Header: contErr.params.header, - Memo: workflowContext.workflowInfo.Memo, - SearchAttributes: workflowContext.workflowInfo.SearchAttributes, + WorkflowType: &commonpb.WorkflowType{Name: contErr.params.workflowType.Name}, + Input: contErr.params.input, + TaskList: &tasklistpb.TaskList{Name: contErr.params.taskListName}, + WorkflowRunTimeoutSeconds: contErr.params.workflowRunTimeoutSeconds, + WorkflowTaskTimeoutSeconds: contErr.params.workflowTaskTimeoutSeconds, + Header: contErr.params.header, + Memo: workflowContext.workflowInfo.Memo, + SearchAttributes: workflowContext.workflowInfo.SearchAttributes, }} } else if workflowContext.err != nil { // Workflow failures diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 72e448dbf..3c40743ec 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -880,21 +880,23 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { parentNamespace := "parentNamespace" var attempt int32 = 123 var executionTimeout int32 = 213456 + var runTimeout int32 = 21098 var taskTimeout int32 = 21 workflowType := "GetWorkflowInfoWorkflow" lastCompletionResult, err := getDefaultDataConverter().ToData("lastCompletionData") t.NoError(err) startedEventAttributes := &eventpb.WorkflowExecutionStartedEventAttributes{ - Input: lastCompletionResult, - TaskList: &tasklistpb.TaskList{Name: taskList}, - ParentWorkflowExecution: parentExecution, - CronSchedule: cronSchedule, - ContinuedExecutionRunId: continuedRunID, - ParentWorkflowNamespace: parentNamespace, - Attempt: attempt, - ExecutionStartToCloseTimeoutSeconds: executionTimeout, - TaskStartToCloseTimeoutSeconds: taskTimeout, - LastCompletionResult: lastCompletionResult, + Input: lastCompletionResult, + TaskList: &tasklistpb.TaskList{Name: taskList}, + ParentWorkflowExecution: parentExecution, + CronSchedule: cronSchedule, + ContinuedExecutionRunId: continuedRunID, + ParentWorkflowNamespace: parentNamespace, + Attempt: attempt, + WorkflowExecutionTimeoutSeconds: executionTimeout, + WorkflowRunTimeoutSeconds: runTimeout, + WorkflowTaskTimeoutSeconds: taskTimeout, + LastCompletionResult: lastCompletionResult, } testEvents := []*eventpb.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, startedEventAttributes), @@ -927,8 +929,9 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { t.EqualValues(continuedRunID, result.ContinuedExecutionRunID) t.EqualValues(parentNamespace, result.ParentWorkflowNamespace) t.EqualValues(attempt, result.Attempt) - t.EqualValues(executionTimeout, result.ExecutionStartToCloseTimeoutSeconds) - t.EqualValues(taskTimeout, result.TaskStartToCloseTimeoutSeconds) + t.EqualValues(executionTimeout, result.WorkflowExecutionTimeoutSeconds) + t.EqualValues(runTimeout, result.WorkflowRunTimeoutSeconds) + t.EqualValues(taskTimeout, result.WorkflowTaskTimeoutSeconds) t.EqualValues(workflowType, result.WorkflowType.Name) t.EqualValues(testNamespace, result.Namespace) } @@ -1115,7 +1118,6 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_DecisionHeartbeatFail() { InitialInterval: backoffDuration, BackoffCoefficient: 1.1, MaximumInterval: time.Minute, - ExpirationInterval: time.Minute, }, } ctx = WithLocalActivityOptions(ctx, ao) @@ -1136,8 +1138,8 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_DecisionHeartbeatFail() { testEvents := []*eventpb.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &eventpb.WorkflowExecutionStartedEventAttributes{ // make sure the timeout is same as the backoff interval - TaskStartToCloseTimeoutSeconds: backoffIntervalInSeconds, - TaskList: &tasklistpb.TaskList{Name: testWorkflowTaskTasklist}}, + WorkflowTaskTimeoutSeconds: backoffIntervalInSeconds, + TaskList: &tasklistpb.TaskList{Name: testWorkflowTaskTasklist}}, ), createTestEventDecisionTaskScheduled(2, &eventpb.DecisionTaskScheduledEventAttributes{}), decisionTaskStartedEvent, diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 0ebf62bec..4699ddf71 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -510,7 +510,11 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi } }() - timeoutDuration := time.Duration(task.params.ScheduleToCloseTimeoutSeconds) * time.Second + timeout := task.params.ScheduleToCloseTimeoutSeconds + if task.params.StartToCloseTimeoutSeconds != 0 && task.params.StartToCloseTimeoutSeconds < timeout { + timeout = task.params.StartToCloseTimeoutSeconds + } + timeoutDuration := time.Duration(timeout) * time.Second deadline := time.Now().Add(timeoutDuration) if task.attempt > 0 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) { // this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index f883d95af..32fe35eb4 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -226,10 +226,10 @@ func (s *InterfacesTestSuite) TestInterface() { // Start a workflow. workflowOptions := StartWorkflowOptions{ - ID: "HelloWorld_Workflow", - TaskList: "testTaskList", - ExecutionStartToCloseTimeout: 10 * time.Second, - DecisionTaskStartToCloseTimeout: 10 * time.Second, + ID: "HelloWorld_Workflow", + TaskList: "testTaskList", + WorkflowExecutionTimeout: 10 * time.Second, + WorkflowTaskTimeout: 10 * time.Second, } workflowClient := NewServiceClient(s.service, nil, ClientOptions{}) _, err := workflowClient.ExecuteWorkflow(context.Background(), workflowOptions, "workflowType") diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 2759b5ad4..8d8645114 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -206,8 +206,8 @@ func testReplayWorkflowFromFileParent(ctx Context) error { execution := GetWorkflowInfo(ctx).WorkflowExecution childID := fmt.Sprintf("child_workflow:%v", execution.RunID) cwo := ChildWorkflowOptions{ - WorkflowID: childID, - ExecutionStartToCloseTimeout: time.Minute, + WorkflowID: childID, + WorkflowExecutionTimeout: time.Minute, } ctx = WithChildWorkflowOptions(ctx, cwo) var result string diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 2a64849b3..4986927cb 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -247,10 +247,11 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() { EventId: 1, EventType: eventpb.EventType_WorkflowExecutionStarted, Attributes: &eventpb.HistoryEvent_WorkflowExecutionStartedEventAttributes{WorkflowExecutionStartedEventAttributes: &eventpb.WorkflowExecutionStartedEventAttributes{ - TaskList: &tasklistpb.TaskList{Name: taskList}, - ExecutionStartToCloseTimeoutSeconds: 10, - TaskStartToCloseTimeoutSeconds: 2, - WorkflowType: &commonpb.WorkflowType{Name: "long-running-decision-workflow-type"}, + TaskList: &tasklistpb.TaskList{Name: taskList}, + WorkflowExecutionTimeoutSeconds: 10, + WorkflowRunTimeoutSeconds: 10, + WorkflowTaskTimeoutSeconds: 2, + WorkflowType: &commonpb.WorkflowType{Name: "long-running-decision-workflow-type"}, }}, }, createTestEventDecisionTaskScheduled(2, &eventpb.DecisionTaskScheduledEventAttributes{TaskList: &tasklistpb.TaskList{Name: taskList}}), @@ -387,10 +388,11 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() { EventId: 1, EventType: eventpb.EventType_WorkflowExecutionStarted, Attributes: &eventpb.HistoryEvent_WorkflowExecutionStartedEventAttributes{WorkflowExecutionStartedEventAttributes: &eventpb.WorkflowExecutionStartedEventAttributes{ - TaskList: &tasklistpb.TaskList{Name: taskList}, - ExecutionStartToCloseTimeoutSeconds: 10, - TaskStartToCloseTimeoutSeconds: 3, - WorkflowType: &commonpb.WorkflowType{Name: "multiple-local-activities-workflow-type"}, + TaskList: &tasklistpb.TaskList{Name: taskList}, + WorkflowExecutionTimeoutSeconds: 10, + WorkflowRunTimeoutSeconds: 10, + WorkflowTaskTimeoutSeconds: 3, + WorkflowType: &commonpb.WorkflowType{Name: "multiple-local-activities-workflow-type"}, }}, }, createTestEventDecisionTaskScheduled(2, &eventpb.DecisionTaskScheduledEventAttributes{TaskList: &tasklistpb.TaskList{Name: taskList}}), diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index e88efe881..fa41fd0f2 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -168,22 +168,23 @@ type ( // The current timeout resolution implementation is in seconds and uses math.Ceil() as the duration. But is // subjected to change in the future. workflowOptions struct { - taskListName string - executionStartToCloseTimeoutSeconds int32 - taskStartToCloseTimeoutSeconds int32 - namespace string - workflowID string - waitForCancellation bool - signalChannels map[string]Channel - queryHandlers map[string]func(*commonpb.Payload) (*commonpb.Payload, error) - workflowIDReusePolicy WorkflowIDReusePolicy - dataConverter DataConverter - retryPolicy *commonpb.RetryPolicy - cronSchedule string - contextPropagators []ContextPropagator - memo map[string]interface{} - searchAttributes map[string]interface{} - parentClosePolicy ParentClosePolicy + taskListName string + workflowExecutionTimeoutSeconds int32 + workflowRunTimeoutSeconds int32 + workflowTaskTimeoutSeconds int32 + namespace string + workflowID string + waitForCancellation bool + signalChannels map[string]Channel + queryHandlers map[string]func(*commonpb.Payload) (*commonpb.Payload, error) + workflowIDReusePolicy WorkflowIDReusePolicy + dataConverter DataConverter + retryPolicy *commonpb.RetryPolicy + cronSchedule string + contextPropagators []ContextPropagator + memo map[string]interface{} + searchAttributes map[string]interface{} + parentClosePolicy ParentClosePolicy } executeWorkflowParams struct { @@ -430,8 +431,9 @@ func newWorkflowContext(env workflowEnvironment, interceptors WorkflowIntercepto wInfo := env.WorkflowInfo() rootCtx = WithWorkflowNamespace(rootCtx, wInfo.Namespace) rootCtx = WithWorkflowTaskList(rootCtx, wInfo.TaskListName) - rootCtx = WithExecutionStartToCloseTimeout(rootCtx, time.Duration(wInfo.ExecutionStartToCloseTimeoutSeconds)*time.Second) - rootCtx = WithWorkflowTaskStartToCloseTimeout(rootCtx, time.Duration(wInfo.TaskStartToCloseTimeoutSeconds)*time.Second) + getWorkflowEnvOptions(rootCtx).workflowExecutionTimeoutSeconds = wInfo.WorkflowExecutionTimeoutSeconds + rootCtx = WithWorkflowRunTimeout(rootCtx, time.Duration(wInfo.WorkflowRunTimeoutSeconds)*time.Second) + rootCtx = WithWorkflowTaskTimeout(rootCtx, time.Duration(wInfo.WorkflowTaskTimeoutSeconds)*time.Second) rootCtx = WithTaskList(rootCtx, wInfo.TaskListName) rootCtx = WithDataConverter(rootCtx, env.GetDataConverter()) rootCtx = withContextPropagators(rootCtx, env.GetContextPropagators()) @@ -589,7 +591,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) { hasResult = false v, ok, m := c.receiveAsyncImpl(callback) - if !ok && !m { // channel closed and empty + if !ok && !m { //channel closed and empty return m } @@ -599,7 +601,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) { state.unblocked() return m } - continue // corrupt signal. Drop and reset process + continue //corrupt signal. Drop and reset process } for { if hasResult { @@ -608,7 +610,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) { state.unblocked() return more } - break // Corrupt signal. Drop and reset process. + break //Corrupt signal. Drop and reset process. } state.yield(fmt.Sprintf("blocked on %s.Receive", c.name)) } @@ -624,7 +626,7 @@ func (c *channelImpl) ReceiveAsync(valuePtr interface{}) (ok bool) { func (c *channelImpl) ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool) { for { v, ok, more := c.receiveAsyncImpl(nil) - if !ok && !more { // channel closed and empty + if !ok && !more { //channel closed and empty return ok, more } @@ -767,7 +769,7 @@ func (c *channelImpl) Close() { // Takes a value and assigns that 'to' value. logs a metric if it is unable to deserialize func (c *channelImpl) assignValue(from interface{}, to interface{}) error { err := decodeAndAssignValue(c.dataConverter, from, to) - // add to metrics + //add to metrics if err != nil { c.env.GetLogger().Error(fmt.Sprintf("Corrupt signal received on channel %s. Error deserializing", c.name), zap.Error(err)) c.env.GetMetricsScope().Counter(metrics.CorruptedSignalsCounter).Inc(1) diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index b02acec5f..d6d902208 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -166,8 +166,9 @@ func (wc *WorkflowClient) StartWorkflow( workflowID = uuid.NewRandom().String() } - executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) - decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) + executionTimeout := common.Int32Ceil(options.WorkflowExecutionTimeout.Seconds()) + runTimeout := common.Int32Ceil(options.WorkflowRunTimeout.Seconds()) + workflowTaskTimeout := common.Int32Ceil(options.WorkflowTaskTimeout.Seconds()) // Validate type and its arguments. workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, args, wc.dataConverter, wc.registry) @@ -200,21 +201,22 @@ func (wc *WorkflowClient) StartWorkflow( // run propagators to extract information about tracing and other stuff, store in headers field startRequest := &workflowservice.StartWorkflowExecutionRequest{ - Namespace: wc.namespace, - RequestId: uuid.New(), - WorkflowId: workflowID, - WorkflowType: &commonpb.WorkflowType{Name: workflowType.Name}, - TaskList: &tasklistpb.TaskList{Name: options.TaskList}, - Input: input, - ExecutionStartToCloseTimeoutSeconds: executionTimeout, - TaskStartToCloseTimeoutSeconds: decisionTaskTimeout, - Identity: wc.identity, - WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toProto(), - RetryPolicy: convertRetryPolicy(options.RetryPolicy), - CronSchedule: options.CronSchedule, - Memo: memo, - SearchAttributes: searchAttr, - Header: header, + Namespace: wc.namespace, + RequestId: uuid.New(), + WorkflowId: workflowID, + WorkflowType: &commonpb.WorkflowType{Name: workflowType.Name}, + TaskList: &tasklistpb.TaskList{Name: options.TaskList}, + Input: input, + WorkflowExecutionTimeoutSeconds: executionTimeout, + WorkflowRunTimeoutSeconds: runTimeout, + WorkflowTaskTimeoutSeconds: workflowTaskTimeout, + Identity: wc.identity, + WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toProto(), + RetryPolicy: convertRetryPolicy(options.RetryPolicy), + CronSchedule: options.CronSchedule, + Memo: memo, + SearchAttributes: searchAttr, + Header: header, } var response *workflowservice.StartWorkflowExecutionResponse @@ -349,8 +351,9 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI workflowID = uuid.NewRandom().String() } - executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) - decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) + executionTimeout := common.Int32Ceil(options.WorkflowExecutionTimeout.Seconds()) + runTimeout := common.Int32Ceil(options.WorkflowRunTimeout.Seconds()) + taskTimeout := common.Int32Ceil(options.WorkflowTaskTimeout.Seconds()) // Validate type and its arguments. workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, workflowArgs, wc.dataConverter, wc.registry) @@ -376,23 +379,24 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI header := wc.getWorkflowHeader(ctx) signalWithStartRequest := &workflowservice.SignalWithStartWorkflowExecutionRequest{ - Namespace: wc.namespace, - RequestId: uuid.New(), - WorkflowId: workflowID, - WorkflowType: &commonpb.WorkflowType{Name: workflowType.Name}, - TaskList: &tasklistpb.TaskList{Name: options.TaskList}, - Input: input, - ExecutionStartToCloseTimeoutSeconds: executionTimeout, - TaskStartToCloseTimeoutSeconds: decisionTaskTimeout, - SignalName: signalName, - SignalInput: signalInput, - Identity: wc.identity, - RetryPolicy: convertRetryPolicy(options.RetryPolicy), - CronSchedule: options.CronSchedule, - Memo: memo, - SearchAttributes: searchAttr, - WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toProto(), - Header: header, + Namespace: wc.namespace, + RequestId: uuid.New(), + WorkflowId: workflowID, + WorkflowType: &commonpb.WorkflowType{Name: workflowType.Name}, + TaskList: &tasklistpb.TaskList{Name: options.TaskList}, + Input: input, + WorkflowExecutionTimeoutSeconds: executionTimeout, + WorkflowRunTimeoutSeconds: runTimeout, + WorkflowTaskTimeoutSeconds: taskTimeout, + SignalName: signalName, + SignalInput: signalInput, + Identity: wc.identity, + RetryPolicy: convertRetryPolicy(options.RetryPolicy), + CronSchedule: options.CronSchedule, + Memo: memo, + SearchAttributes: searchAttr, + WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toProto(), + Header: header, } var response *workflowservice.SignalWithStartWorkflowExecutionResponse diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index b4886a0d6..8cb09615e 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -390,11 +390,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Success() { workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -438,11 +438,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_RawHistory_Success() { workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + ID: workflowID, + TaskList: tasklist, + WorkflowRunTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -484,11 +484,11 @@ func (s *workflowRunSuite) TestExecuteWorkflowWorkflowExecutionAlreadyStartedErr workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -535,11 +535,11 @@ func (s *workflowRunSuite) TestExecuteWorkflowWorkflowExecutionAlreadyStartedErr workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + ID: workflowID, + TaskList: tasklist, + WorkflowRunTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -585,10 +585,10 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoIdInOptions() { workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -638,10 +638,10 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoIdInOptions_RawHistory() { workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + TaskList: tasklist, + WorkflowRunTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -682,11 +682,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Cancelled() { workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -732,11 +732,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Failed() { workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -772,11 +772,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_Terminated() { workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -816,11 +816,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_TimedOut() { workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -883,11 +883,11 @@ func (s *workflowRunSuite) TestExecuteWorkflow_NoDup_ContinueAsNew() { workflowRun, err := s.workflowClient.ExecuteWorkflow( context.Background(), StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds * time.Second, - DecisionTaskStartToCloseTimeout: timeoutInSeconds * time.Second, - WorkflowIDReusePolicy: workflowIDReusePolicy, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds * time.Second, + WorkflowTaskTimeout: timeoutInSeconds * time.Second, + WorkflowIDReusePolicy: workflowIDReusePolicy, }, workflowType, ) s.Nil(err) @@ -986,10 +986,10 @@ func (s *workflowClientTestSuite) TestSignalWithStartWorkflow() { signalName := "my signal" signalInput := []byte("my signal input") options := StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds, - DecisionTaskStartToCloseTimeout: timeoutInSeconds, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, } createResponse := &workflowservice.SignalWithStartWorkflowExecutionResponse{ @@ -1012,10 +1012,10 @@ func (s *workflowClientTestSuite) TestStartWorkflow() { client, ok := s.client.(*WorkflowClient) s.True(ok) options := StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds, - DecisionTaskStartToCloseTimeout: timeoutInSeconds, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, } f1 := func(ctx Context, r []byte) string { return "result" @@ -1039,10 +1039,10 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithContext() { client, ok := s.client.(*WorkflowClient) s.True(ok) options := StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds, - DecisionTaskStartToCloseTimeout: timeoutInSeconds, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, } f1 := func(ctx Context, r []byte) error { value := ctx.Value(contextKey(testHeader)) @@ -1069,10 +1069,10 @@ func (s *workflowClientTestSuite) TestStartWorkflowWithDataConverter() { client, ok := s.client.(*WorkflowClient) s.True(ok) options := StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds, - DecisionTaskStartToCloseTimeout: timeoutInSeconds, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, } f1 := func(ctx Context, r []byte) string { return "result" @@ -1106,12 +1106,12 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithMemoAndSearchAttr() { "testAttr": "attr value", } options := StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds, - DecisionTaskStartToCloseTimeout: timeoutInSeconds, - Memo: memo, - SearchAttributes: searchAttributes, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + Memo: memo, + SearchAttributes: searchAttributes, } wf := func(ctx Context) string { return "result" @@ -1140,12 +1140,12 @@ func (s *workflowClientTestSuite) SignalWithStartWorkflowWithMemoAndSearchAttr() "testAttr": "attr value", } options := StartWorkflowOptions{ - ID: workflowID, - TaskList: tasklist, - ExecutionStartToCloseTimeout: timeoutInSeconds, - DecisionTaskStartToCloseTimeout: timeoutInSeconds, - Memo: memo, - SearchAttributes: searchAttributes, + ID: workflowID, + TaskList: tasklist, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + Memo: memo, + SearchAttributes: searchAttributes, } wf := func(ctx Context) string { return "result" diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index 5896a6b13..aba82e639 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -417,16 +417,18 @@ func continueAsNewWorkflowTest(ctx Context) error { func (s *WorkflowUnitTest) Test_ContinueAsNewWorkflow() { env := s.NewTestWorkflowEnvironment() env.SetStartWorkflowOptions(StartWorkflowOptions{ - ExecutionStartToCloseTimeout: 100 * time.Second, - DecisionTaskStartToCloseTimeout: 5 * time.Second, + WorkflowExecutionTimeout: 100 * time.Second, + WorkflowTaskTimeout: 5 * time.Second, + WorkflowRunTimeout: 50 * time.Second, }) env.ExecuteWorkflow(continueAsNewWorkflowTest) s.True(env.IsWorkflowCompleted()) s.NotNil(env.GetWorkflowError()) resultErr := env.GetWorkflowError().(*ContinueAsNewError) s.EqualValues("continueAsNewWorkflowTest", resultErr.params.workflowType.Name) - s.EqualValues(100, resultErr.params.executionStartToCloseTimeoutSeconds) - s.EqualValues(5, resultErr.params.taskStartToCloseTimeoutSeconds) + s.EqualValues(100, resultErr.params.workflowExecutionTimeoutSeconds) + s.EqualValues(50, resultErr.params.workflowRunTimeoutSeconds) + s.EqualValues(5, resultErr.params.workflowTaskTimeoutSeconds) s.EqualValues("default-test-tasklist", resultErr.params.taskListName) } @@ -966,7 +968,7 @@ func sleepWorkflow(ctx Context, input time.Duration) (int, error) { func waitGroupWorkflowTest(ctx Context, n int) (int, error) { ctx = WithChildWorkflowOptions(ctx, ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Second * 30, + WorkflowExecutionTimeout: time.Second * 30, }) var err error @@ -998,7 +1000,7 @@ func waitGroupWorkflowTest(ctx Context, n int) (int, error) { func waitGroupWaitForMWorkflowTest(ctx Context, n int, m int) (int, error) { ctx = WithChildWorkflowOptions(ctx, ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Second * 30, + WorkflowExecutionTimeout: time.Second * 30, }) var err error @@ -1030,7 +1032,7 @@ func waitGroupWaitForMWorkflowTest(ctx Context, n int, m int) (int, error) { func waitGroupMultipleWaitsWorkflowTest(ctx Context) (int, error) { ctx = WithChildWorkflowOptions(ctx, ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Second * 30, + WorkflowExecutionTimeout: time.Second * 30, }) n := 10 @@ -1069,7 +1071,7 @@ func waitGroupMultipleWaitsWorkflowTest(ctx Context) (int, error) { func waitGroupMultipleConcurrentWaitsPanicsWorkflowTest(ctx Context) (int, error) { ctx = WithChildWorkflowOptions(ctx, ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Second * 30, + WorkflowExecutionTimeout: time.Second * 30, }) var err error @@ -1099,7 +1101,7 @@ func waitGroupMultipleConcurrentWaitsPanicsWorkflowTest(ctx Context) (int, error func waitGroupNegativeCounterPanicsWorkflowTest(ctx Context) (int, error) { ctx = WithChildWorkflowOptions(ctx, ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Second * 30, + WorkflowExecutionTimeout: time.Second * 30, }) var err error diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index ca2053dfa..5cb2b9508 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -64,8 +64,9 @@ const ( workflowTypeNotSpecified = "workflow-type-not-specified" // These are copied from service implementation - reservedTaskListPrefix = "/__temporal_sys/" - maxIDLengthLimit = 1000 + reservedTaskListPrefix = "/__temporal_sys/" + maxIDLengthLimit = 1000 + maxWorkflowTimeoutSeconds = 60 * 24 * 365 * 10 ) type ( @@ -187,13 +188,13 @@ type ( queryHandler func(string, *commonpb.Payload) (*commonpb.Payload, error) startedHandler func(r WorkflowExecution, e error) - isTestCompleted bool - testResult Value - testError error - doneChannel chan struct{} - workerOptions WorkerOptions - dataConverter DataConverter - executionTimeout time.Duration + isTestCompleted bool + testResult Value + testError error + doneChannel chan struct{} + workerOptions WorkerOptions + dataConverter DataConverter + runTimeout time.Duration heartbeatDetails *commonpb.Payload @@ -220,18 +221,17 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist testSuite: s, taskListSpecificActivities: make(map[string]*taskListSpecificActivity), - logger: s.logger, - metricsScope: metrics.NewTaggedScope(s.scope), - tracer: opentracing.NoopTracer{}, - mockClock: clock.NewMock(), - wallClock: clock.New(), - timers: make(map[string]*testTimerHandle), - activities: make(map[string]*testActivityHandle), - localActivities: make(map[string]*localActivityTask), - runningWorkflows: make(map[string]*testWorkflowHandle), - callbackChannel: make(chan testCallbackHandle, 1000), - testTimeout: 3 * time.Second, - + logger: s.logger, + metricsScope: metrics.NewTaggedScope(s.scope), + tracer: opentracing.NoopTracer{}, + mockClock: clock.NewMock(), + wallClock: clock.New(), + timers: make(map[string]*testTimerHandle), + activities: make(map[string]*testActivityHandle), + localActivities: make(map[string]*localActivityTask), + runningWorkflows: make(map[string]*testWorkflowHandle), + callbackChannel: make(chan testCallbackHandle, 1000), + testTimeout: 3 * time.Second, expectedMockCalls: make(map[string]struct{}), }, @@ -244,8 +244,8 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist WorkflowType: WorkflowType{Name: workflowTypeNotSpecified}, TaskListName: defaultTestTaskList, - ExecutionStartToCloseTimeoutSeconds: 60 * 24 * 365 * 10, - TaskStartToCloseTimeoutSeconds: 1, + WorkflowExecutionTimeoutSeconds: maxWorkflowTimeoutSeconds, + WorkflowTaskTimeoutSeconds: 1, }, registry: r, @@ -255,6 +255,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist doneChannel: make(chan struct{}), workerStopChannel: make(chan struct{}), dataConverter: getDefaultDataConverter(), + runTimeout: maxWorkflowTimeoutSeconds * time.Second, } // move forward the mock clock to start time. @@ -355,13 +356,14 @@ func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(param childEnv.workflowInfo.WorkflowExecution.RunID = params.workflowID + "_RunID" childEnv.workflowInfo.Namespace = params.namespace childEnv.workflowInfo.TaskListName = params.taskListName - childEnv.workflowInfo.ExecutionStartToCloseTimeoutSeconds = params.executionStartToCloseTimeoutSeconds - childEnv.workflowInfo.TaskStartToCloseTimeoutSeconds = params.taskStartToCloseTimeoutSeconds + childEnv.workflowInfo.WorkflowExecutionTimeoutSeconds = params.workflowExecutionTimeoutSeconds + childEnv.workflowInfo.WorkflowRunTimeoutSeconds = params.workflowRunTimeoutSeconds + childEnv.workflowInfo.WorkflowTaskTimeoutSeconds = params.workflowTaskTimeoutSeconds childEnv.workflowInfo.lastCompletionResult = params.lastCompletionResult childEnv.workflowInfo.CronSchedule = cronSchedule childEnv.workflowInfo.ParentWorkflowNamespace = env.workflowInfo.Namespace childEnv.workflowInfo.ParentWorkflowExecution = &env.workflowInfo.WorkflowExecution - childEnv.executionTimeout = time.Duration(params.executionStartToCloseTimeoutSeconds) * time.Second + childEnv.runTimeout = time.Duration(params.workflowRunTimeoutSeconds) * time.Second if workflowHandler, ok := env.runningWorkflows[params.workflowID]; ok { // duplicate workflow ID if !workflowHandler.handled { @@ -431,16 +433,26 @@ func (env *testWorkflowEnvironmentImpl) executeWorkflow(workflowFn interface{}, func (env *testWorkflowEnvironmentImpl) executeWorkflowInternal(delayStart time.Duration, workflowType string, input *commonpb.Payload) { env.locker.Lock() - if env.workflowInfo.WorkflowType.Name != workflowTypeNotSpecified { + wInfo := env.workflowInfo + if wInfo.WorkflowType.Name != workflowTypeNotSpecified { // Current TestWorkflowEnvironment only support to run one workflow. // Created task to support testing multiple workflows with one env instance // https://github.com/temporalio/temporal-go-client/issues/50 - panic(fmt.Sprintf("Current TestWorkflowEnvironment is used to execute %v. Please create a new TestWorkflowEnvironment for %v.", env.workflowInfo.WorkflowType.Name, workflowType)) + panic(fmt.Sprintf("Current TestWorkflowEnvironment is used to execute %v. Please create a new TestWorkflowEnvironment for %v.", wInfo.WorkflowType.Name, workflowType)) + } + wInfo.WorkflowType.Name = workflowType + if wInfo.WorkflowRunTimeoutSeconds == 0 { + wInfo.WorkflowRunTimeoutSeconds = common.Int32Ceil(env.runTimeout.Seconds()) + } + if wInfo.WorkflowExecutionTimeoutSeconds == 0 { + wInfo.WorkflowExecutionTimeoutSeconds = maxWorkflowTimeoutSeconds + } + if wInfo.WorkflowTaskTimeoutSeconds == 0 { + wInfo.WorkflowTaskTimeoutSeconds = 1 } - env.workflowInfo.WorkflowType.Name = workflowType env.locker.Unlock() - workflowDefinition, err := env.getWorkflowDefinition(env.workflowInfo.WorkflowType) + workflowDefinition, err := env.getWorkflowDefinition(wInfo.WorkflowType) if err != nil { panic(err) } @@ -464,8 +476,8 @@ func (env *testWorkflowEnvironmentImpl) executeWorkflowInternal(delayStart time. } }, false) - if env.executionTimeout > 0 { - timeoutDuration := env.executionTimeout + delayStart + if env.runTimeout > 0 { + timeoutDuration := env.runTimeout + delayStart env.registerDelayedCallback(func() { if !env.isTestCompleted { env.Complete(nil, ErrDeadlineExceeded) @@ -857,8 +869,8 @@ func (h *testWorkflowHandle) rerunAsChild() bool { if params.retryPolicy != nil && env.testError != nil { errReason, _ := getErrorDetails(env.testError, env.GetDataConverter()) var expireTime time.Time - if params.retryPolicy.GetExpirationIntervalInSeconds() > 0 { - expireTime = params.scheduledTime.Add(time.Second * time.Duration(params.retryPolicy.GetExpirationIntervalInSeconds())) + if params.workflowOptions.workflowExecutionTimeoutSeconds > 0 { + expireTime = params.scheduledTime.Add(time.Second * time.Duration(params.workflowOptions.workflowExecutionTimeoutSeconds)) } backoff := getRetryBackoffFromProtoRetryPolicy(params.retryPolicy, env.workflowInfo.Attempt, errReason, env.Now(), expireTime) if backoff > 0 { @@ -953,7 +965,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters executeActivi scheduleTaskAttr.HeartbeatTimeoutSeconds = parameters.HeartbeatTimeoutSeconds scheduleTaskAttr.RetryPolicy = parameters.RetryPolicy scheduleTaskAttr.Header = parameters.Header - err := env.validateActivityScheduleAttributes(scheduleTaskAttr, env.WorkflowInfo().ExecutionStartToCloseTimeoutSeconds) + err := env.validateActivityScheduleAttributes(scheduleTaskAttr, env.WorkflowInfo().WorkflowRunTimeoutSeconds) activityInfo := &activityInfo{activityID: activityID} if err != nil { callback(nil, err) @@ -1006,7 +1018,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters executeActivi // Copy of the server function func (v *decisionAttrValidator) validateActivityScheduleAttributes func (env *testWorkflowEnvironmentImpl) validateActivityScheduleAttributes( attributes *decisionpb.ScheduleActivityTaskDecisionAttributes, - wfTimeout int32, + runTimeout int32, ) error { //if err := v.validateCrossNamespaceCall( @@ -1055,53 +1067,50 @@ func (env *testWorkflowEnvironmentImpl) validateActivityScheduleAttributes( return serviceerror.NewInvalidArgument("A valid timeout may not be negative.") } - // ensure activity timeout never larger than workflow timeout - if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout { - attributes.ScheduleToCloseTimeoutSeconds = wfTimeout - } - if attributes.GetScheduleToStartTimeoutSeconds() > wfTimeout { - attributes.ScheduleToStartTimeoutSeconds = wfTimeout - } - if attributes.GetStartToCloseTimeoutSeconds() > wfTimeout { - attributes.StartToCloseTimeoutSeconds = wfTimeout - } - if attributes.GetHeartbeatTimeoutSeconds() > wfTimeout { - attributes.HeartbeatTimeoutSeconds = wfTimeout - } - validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0 validScheduleToStart := attributes.GetScheduleToStartTimeoutSeconds() > 0 validStartToClose := attributes.GetStartToCloseTimeoutSeconds() > 0 if validScheduleToClose { - if !validScheduleToStart { + if validScheduleToStart { + attributes.ScheduleToStartTimeoutSeconds = common.MinInt32(attributes.GetScheduleToStartTimeoutSeconds(), + attributes.GetScheduleToCloseTimeoutSeconds()) + } else { attributes.ScheduleToStartTimeoutSeconds = attributes.GetScheduleToCloseTimeoutSeconds() } - if !validStartToClose { + if validStartToClose { + attributes.StartToCloseTimeoutSeconds = common.MinInt32(attributes.GetStartToCloseTimeoutSeconds(), + attributes.GetScheduleToCloseTimeoutSeconds()) + } else { attributes.StartToCloseTimeoutSeconds = attributes.GetScheduleToCloseTimeoutSeconds() } - } else if validScheduleToStart && validStartToClose { - attributes.ScheduleToCloseTimeoutSeconds = attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds() - if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout { - attributes.ScheduleToCloseTimeoutSeconds = wfTimeout + } else if validStartToClose { + // We are in !validScheduleToClose due to the first if above + attributes.ScheduleToCloseTimeoutSeconds = runTimeout + if !validScheduleToStart { + attributes.ScheduleToStartTimeoutSeconds = runTimeout } } else { // Deduction failed as there's not enough information to fill in missing timeouts. - return serviceerror.NewInvalidArgument("A valid ScheduleToCloseTimeout is not set on decision.") - } - // ensure activity's SCHEDULE_TO_START and SCHEDULE_TO_CLOSE is as long as expiration on retry policy - p := attributes.RetryPolicy - if p != nil { - expiration := p.GetExpirationIntervalInSeconds() - if expiration == 0 { - expiration = wfTimeout + return serviceerror.NewInvalidArgument("A valid StartToClose or ScheduleToCloseTimeout is not set on decision.") + } + // ensure activity timeout never larger than workflow timeout + if runTimeout > 0 { + if attributes.GetScheduleToCloseTimeoutSeconds() > runTimeout { + attributes.ScheduleToCloseTimeoutSeconds = runTimeout } - if attributes.GetScheduleToStartTimeoutSeconds() < expiration { - attributes.ScheduleToStartTimeoutSeconds = expiration + if attributes.GetScheduleToStartTimeoutSeconds() > runTimeout { + attributes.ScheduleToStartTimeoutSeconds = runTimeout } - if attributes.GetScheduleToCloseTimeoutSeconds() < expiration { - attributes.ScheduleToCloseTimeoutSeconds = expiration + if attributes.GetStartToCloseTimeoutSeconds() > runTimeout { + attributes.StartToCloseTimeoutSeconds = runTimeout } + if attributes.GetHeartbeatTimeoutSeconds() > runTimeout { + attributes.HeartbeatTimeoutSeconds = runTimeout + } + } + if attributes.GetHeartbeatTimeoutSeconds() > attributes.GetScheduleToCloseTimeoutSeconds() { + attributes.HeartbeatTimeoutSeconds = attributes.GetScheduleToCloseTimeoutSeconds() } return nil } @@ -1142,8 +1151,8 @@ func (env *testWorkflowEnvironmentImpl) validateRetryPolicy(policy *commonpb.Ret // nil policy is valid which means no retry return nil } - if policy.GetInitialIntervalInSeconds() <= 0 { - return serviceerror.NewInvalidArgument("InitialIntervalInSeconds must be greater than 0 on retry policy.") + if policy.GetInitialIntervalInSeconds() < 0 { + return serviceerror.NewInvalidArgument("InitialIntervalInSeconds cannot be less than 0 on retry policy.") } if policy.GetBackoffCoefficient() < 1 { return serviceerror.NewInvalidArgument("BackoffCoefficient cannot be less than 1 on retry policy.") @@ -1157,12 +1166,6 @@ func (env *testWorkflowEnvironmentImpl) validateRetryPolicy(policy *commonpb.Ret if policy.GetMaximumAttempts() < 0 { return serviceerror.NewInvalidArgument("MaximumAttempts cannot be less than 0 on retry policy.") } - if policy.GetExpirationIntervalInSeconds() < 0 { - return serviceerror.NewInvalidArgument("ExpirationIntervalInSeconds cannot be less than 0 on retry policy.") - } - if policy.GetMaximumAttempts() == 0 && policy.GetExpirationIntervalInSeconds() == 0 { - return serviceerror.NewInvalidArgument("MaximumAttempts and ExpirationIntervalInSeconds are both 0. At least one of them must be specified.") - } return nil } @@ -1191,8 +1194,8 @@ func (env *testWorkflowEnvironmentImpl) executeActivityWithRetryForTest( task *workflowservice.PollForActivityTaskResponse, ) (result interface{}) { var expireTime time.Time - if parameters.RetryPolicy != nil && parameters.RetryPolicy.GetExpirationIntervalInSeconds() > 0 { - expireTime = env.Now().Add(time.Second * time.Duration(parameters.RetryPolicy.GetExpirationIntervalInSeconds())) + if parameters.ScheduleToCloseTimeoutSeconds > 0 { + expireTime = env.Now().Add(time.Second * time.Duration(parameters.ScheduleToCloseTimeoutSeconds)) } for { @@ -1243,7 +1246,6 @@ func fromProtoRetryPolicy(p *commonpb.RetryPolicy) *RetryPolicy { InitialInterval: time.Second * time.Duration(p.GetInitialIntervalInSeconds()), BackoffCoefficient: p.GetBackoffCoefficient(), MaximumInterval: time.Second * time.Duration(p.GetMaximumIntervalInSeconds()), - ExpirationInterval: time.Second * time.Duration(p.GetExpirationIntervalInSeconds()), MaximumAttempts: p.GetMaximumAttempts(), NonRetriableErrorReasons: p.NonRetriableErrorReasons, } @@ -2185,11 +2187,14 @@ func (env *testWorkflowEnvironmentImpl) GetRegistry() *registry { func (env *testWorkflowEnvironmentImpl) setStartWorkflowOptions(options StartWorkflowOptions) { wf := env.workflowInfo - if options.ExecutionStartToCloseTimeout > 0 { - wf.ExecutionStartToCloseTimeoutSeconds = common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds()) + if options.WorkflowExecutionTimeout > 0 { + wf.WorkflowExecutionTimeoutSeconds = common.Int32Ceil(options.WorkflowExecutionTimeout.Seconds()) + } + if options.WorkflowRunTimeout > 0 { + wf.WorkflowRunTimeoutSeconds = common.Int32Ceil(options.WorkflowRunTimeout.Seconds()) } - if options.DecisionTaskStartToCloseTimeout > 0 { - wf.TaskStartToCloseTimeoutSeconds = common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds()) + if options.WorkflowTaskTimeout > 0 { + wf.WorkflowTaskTimeoutSeconds = common.Int32Ceil(options.WorkflowTaskTimeout.Seconds()) } if len(options.TaskList) > 0 { wf.TaskListName = options.TaskList diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 59cd539f6..c2cbbf644 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -592,7 +592,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Basic() { return "", err } - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Minute} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Minute} ctx = WithChildWorkflowOptions(ctx, cwo) var helloWorkflowResult string err = ExecuteChildWorkflow(ctx, testWorkflowHello).Get(ctx, &helloWorkflowResult) @@ -625,7 +625,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_BasicWithDataConverter() return "", err } - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Minute} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Minute} ctx = WithChildWorkflowOptions(ctx, cwo) var helloWorkflowResult string ctx = WithDataConverter(ctx, newTestDataConverter()) @@ -654,8 +654,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_BasicWithDataConverter() func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowCancel() { workflowFn := func(ctx Context) error { cwo := ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Minute, - WaitForCancellation: true, + WorkflowRunTimeout: time.Minute, + WaitForCancellation: true, } ctx = WithChildWorkflowOptions(ctx, cwo) ctx1, cancel1 := WithCancel(ctx) @@ -692,7 +692,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Mock() { return "", err } - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Minute} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Minute} ctx = WithChildWorkflowOptions(ctx, cwo) var helloWorkflowResult string err = ExecuteChildWorkflow(ctx, testWorkflowHello).Get(ctx, &helloWorkflowResult) @@ -731,7 +731,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Mock() { // ExecuteChildWorkflow(...).GetChildWorkflowExecution().Get() doesn't block forever when mock panics func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Mock_Panic_GetChildWorkflowExecution() { workflowFn := func(ctx Context) (string, error) { - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Minute} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Minute} ctx = WithChildWorkflowOptions(ctx, cwo) var helloWorkflowResult string childWorkflow := ExecuteChildWorkflow(ctx, testWorkflowHello) @@ -759,7 +759,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Mock_Panic_GetChildWorkfl func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_StartFailed() { workflowFn := func(ctx Context) (string, error) { - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Minute} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Minute} ctx = WithChildWorkflowOptions(ctx, cwo) err := ExecuteChildWorkflow(ctx, testWorkflowHello).GetChildWorkflowExecution().Get(ctx, nil) if err != nil { @@ -789,7 +789,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Listener() { return "", err } - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Minute} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Minute} ctx = WithChildWorkflowOptions(ctx, cwo) var helloWorkflowResult string err = ExecuteChildWorkflow(ctx, testWorkflowHello).Get(ctx, &helloWorkflowResult) @@ -865,7 +865,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Clock() { t1 := NewTimer(ctx, time.Minute) t2 := NewTimer(ctx, time.Minute*10) - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Hour * 2} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Hour * 2} ctx = WithChildWorkflowOptions(ctx, cwo) f1 := ExecuteChildWorkflow(ctx, childWorkflowFn) @@ -1027,7 +1027,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_MockActivityWaitFn() { func (s *WorkflowTestSuiteUnitTest) Test_MockWorkflowWait() { workflowFn := func(ctx Context) error { t1 := NewTimer(ctx, time.Hour) - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Hour /* this is currently ignored by test suite */} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Hour /* this is currently ignored by test suite */} ctx = WithChildWorkflowOptions(ctx, cwo) f1 := ExecuteChildWorkflow(ctx, testWorkflowHello) @@ -1114,7 +1114,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWithChild() { env := s.NewTestWorkflowEnvironment() childWorkflowFn := func(ctx Context) error { t1 := NewTimer(ctx, time.Hour) - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Hour /* this is currently ignored by test suite */} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Hour /* this is currently ignored by test suite */} ctx = WithChildWorkflowOptions(ctx, cwo) f1 := ExecuteChildWorkflow(ctx, testWorkflowHello) @@ -1135,7 +1135,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWithChild() { workflowFn := func(ctx Context) error { t1 := NewTimer(ctx, time.Hour) - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Hour /* this is currently ignored by test suite */} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Hour /* this is currently ignored by test suite */} ctx = WithChildWorkflowOptions(ctx, cwo) f1 := ExecuteChildWorkflow(ctx, childWorkflowFn) // execute child workflow which in turn execute another child workflow @@ -1547,7 +1547,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityFriendlyName() { func (s *WorkflowTestSuiteUnitTest) Test_WorkflowFriendlyName() { workflowFn := func(ctx Context) error { - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Hour /* this is currently ignored by test suite */} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Hour /* this is currently ignored by test suite */} ctx = WithChildWorkflowOptions(ctx, cwo) var result string err := ExecuteChildWorkflow(ctx, testWorkflowHello).Get(ctx, &result) @@ -1586,7 +1586,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowHeaderContext() { return fmt.Errorf("context did not propagate to workflow") } - cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Hour /* this is currently ignored by test suite */} + cwo := ChildWorkflowOptions{WorkflowRunTimeout: time.Hour /* this is currently ignored by test suite */} ctx = WithChildWorkflowOptions(ctx, cwo) var result string if err := ExecuteChildWorkflow(ctx, testWorkflowContext).Get(ctx, &result); err != nil { @@ -1820,8 +1820,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalChildWorkflow() { workflowFn := func(ctx Context) error { cwo := ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Minute, - Namespace: "test-namespace", + WorkflowRunTimeout: time.Minute, + Namespace: "test-namespace", } ctx = WithChildWorkflowOptions(ctx, cwo) childFuture := ExecuteChildWorkflow(ctx, childWorkflowFn, GetWorkflowInfo(ctx).WorkflowExecution) @@ -1938,8 +1938,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_CancelChildWorkflow() { workflowFn := func(ctx Context) error { cwo := ChildWorkflowOptions{ - Namespace: "test-namespace", - ExecutionStartToCloseTimeout: time.Minute, + Namespace: "test-namespace", + WorkflowRunTimeout: time.Minute, } childCtx := WithChildWorkflowOptions(ctx, cwo) @@ -2020,8 +2020,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_DisconnectedContext() { workflowFn := func(ctx Context) (string, error) { cwo := ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Hour, - WaitForCancellation: true, + WorkflowRunTimeout: time.Hour, + WaitForCancellation: true, } ctx = WithChildWorkflowOptions(ctx, cwo) childCtx, cancelChild := WithCancel(ctx) @@ -2055,9 +2055,9 @@ func (s *WorkflowTestSuiteUnitTest) Test_DisconnectedContext() { func (s *WorkflowTestSuiteUnitTest) Test_WorkflowIDReusePolicy() { workflowFn := func(ctx Context) (string, error) { cwo := ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Minute, - WorkflowID: "test-child-workflow-id", - WorkflowIDReusePolicy: WorkflowIDReusePolicyRejectDuplicate, + WorkflowRunTimeout: time.Minute, + WorkflowID: "test-child-workflow-id", + WorkflowIDReusePolicy: WorkflowIDReusePolicyRejectDuplicate, } ctx = WithChildWorkflowOptions(ctx, cwo) var helloWorkflowResult string @@ -2243,7 +2243,6 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityRetry() { MaximumInterval: time.Second * 10, BackoffCoefficient: 2, NonRetriableErrorReasons: []string{"bad-bug"}, - ExpirationInterval: time.Minute, }, } ctx = WithActivityOptions(ctx, ao) @@ -2268,7 +2267,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityRetry() { // set a workflow timeout timer to test // if the timer will fire during activity retry - env.SetWorkflowTimeout(10 * time.Second) + env.SetWorkflowRunTimeout(10 * time.Second) env.ExecuteWorkflow(workflowFn) s.True(env.IsWorkflowCompleted()) @@ -2314,7 +2313,6 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityHeartbeatRetry() { MaximumInterval: time.Second * 10, BackoffCoefficient: 2, NonRetriableErrorReasons: []string{"bad-bug"}, - ExpirationInterval: time.Minute, }, } ctx = WithActivityOptions(ctx, ao) @@ -2357,7 +2355,6 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetry() { MaximumInterval: time.Second * 10, BackoffCoefficient: 2, NonRetriableErrorReasons: []string{"bad-bug"}, - ExpirationInterval: time.Minute, }, } ctx = WithLocalActivityOptions(ctx, lao) @@ -2401,7 +2398,6 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetryOnCancel() { MaximumInterval: time.Second * 10, BackoffCoefficient: 2, NonRetriableErrorReasons: []string{"bad-bug"}, - ExpirationInterval: time.Minute, }, } ctx = WithLocalActivityOptions(ctx, lao) @@ -2435,7 +2431,6 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityRetryOnCancel() { MaximumInterval: time.Second * 10, BackoffCoefficient: 2, NonRetriableErrorReasons: []string{"bad-bug"}, - ExpirationInterval: time.Minute, }, } ctx = WithActivityOptions(ctx, ao) @@ -2471,14 +2466,13 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowRetry() { workflowFn := func(ctx Context) (string, error) { cwo := ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Minute, + WorkflowRunTimeout: time.Minute, RetryPolicy: &RetryPolicy{ MaximumAttempts: 3, InitialInterval: time.Second, MaximumInterval: time.Second * 10, BackoffCoefficient: 2, NonRetriableErrorReasons: []string{"bad-bug"}, - ExpirationInterval: time.Minute, }, } ctx = WithChildWorkflowOptions(ctx, cwo) @@ -2525,15 +2519,14 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalChildWorkflowRetry() { workflowFn := func(ctx Context) (string, error) { cwo := ChildWorkflowOptions{ - WorkflowID: "test-retry-signal-child-workflow", - ExecutionStartToCloseTimeout: time.Minute, + WorkflowID: "test-retry-signal-child-workflow", + WorkflowRunTimeout: time.Minute, RetryPolicy: &RetryPolicy{ MaximumAttempts: 3, InitialInterval: time.Second * 3, MaximumInterval: time.Second * 3, BackoffCoefficient: 1, NonRetriableErrorReasons: []string{"bad-bug"}, - ExpirationInterval: time.Minute, }, } ctx = WithChildWorkflowOptions(ctx, cwo) @@ -2571,7 +2564,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_TestWorkflowTimeoutInBusyLoop() { } env := s.NewTestWorkflowEnvironment() - env.SetWorkflowTimeout((time.Hour * 10) + time.Minute) + env.SetWorkflowRunTimeout((time.Hour * 10) + time.Minute) timerFiredCount := 0 env.SetOnTimerFiredListener(func(timerID string) { timerFiredCount++ @@ -2593,7 +2586,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_TestChildWorkflowTimeout() { workflowFn := func(ctx Context) error { cwo := ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Hour * 3, // less than 5h that child workflow would take. + WorkflowRunTimeout: time.Hour * 3, // less than 5h that child workflow would take. } ctx = WithChildWorkflowOptions(ctx, cwo) err := ExecuteChildWorkflow(ctx, childWorkflowFn).Get(ctx, nil) @@ -2606,7 +2599,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_TestChildWorkflowTimeout() { } env := s.NewTestWorkflowEnvironment() - env.SetWorkflowTimeout(time.Hour * 10) + env.SetWorkflowRunTimeout(time.Hour * 10) timerFiredCount := 0 env.SetOnTimerFiredListener(func(timerID string) { timerFiredCount++ @@ -2640,14 +2633,14 @@ func (s *WorkflowTestSuiteUnitTest) Test_SameActivityIDFromDifferentChildWorkflo workflowFn := func(ctx Context) (string, error) { ctx1 := WithChildWorkflowOptions(ctx, ChildWorkflowOptions{ - WorkflowID: "child_1", - ExecutionStartToCloseTimeout: time.Minute, + WorkflowID: "child_1", + WorkflowRunTimeout: time.Minute, }) f1 := ExecuteChildWorkflow(ctx1, childWorkflowFn) ctx2 := WithChildWorkflowOptions(ctx, ChildWorkflowOptions{ - WorkflowID: "child_2", - ExecutionStartToCloseTimeout: time.Minute, + WorkflowID: "child_2", + WorkflowRunTimeout: time.Minute, }) f2 := ExecuteChildWorkflow(ctx2, childWorkflowFn) @@ -2679,9 +2672,9 @@ func (s *WorkflowTestSuiteUnitTest) Test_SameActivityIDFromDifferentChildWorkflo func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowAlreadyRunning() { workflowFn := func(ctx Context) (string, error) { ctx1 := WithChildWorkflowOptions(ctx, ChildWorkflowOptions{ - WorkflowID: "Test_ChildWorkflowAlreadyRunning", - ExecutionStartToCloseTimeout: time.Minute, - WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicateFailedOnly, + WorkflowID: "Test_ChildWorkflowAlreadyRunning", + WorkflowRunTimeout: time.Minute, + WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicateFailedOnly, }) var result1, result2 string @@ -2734,14 +2727,13 @@ func (s *WorkflowTestSuiteUnitTest) Test_CronWorkflow() { testWorkflow := func(ctx Context) error { ctx1 := WithChildWorkflowOptions(ctx, ChildWorkflowOptions{ - ExecutionStartToCloseTimeout: time.Minute * 10, + WorkflowRunTimeout: time.Minute * 10, RetryPolicy: &RetryPolicy{ MaximumAttempts: 5, InitialInterval: time.Second, MaximumInterval: time.Second * 10, BackoffCoefficient: 2, NonRetriableErrorReasons: []string{"bad-bug"}, - ExpirationInterval: time.Hour, }, CronSchedule: "0 * * * *", // hourly }) diff --git a/internal/session.go b/internal/session.go index ced5b6883..ea3a1ca7c 100644 --- a/internal/session.go +++ b/internal/session.go @@ -293,7 +293,6 @@ func createSession(ctx Context, creationTasklist string, options *SessionOptions InitialInterval: time.Second, BackoffCoefficient: 1.1, MaximumInterval: time.Second * 10, - ExpirationInterval: options.CreationTimeout, NonRetriableErrorReasons: []string{ "temporalInternal:Panic", "temporalInternal:Generic", diff --git a/internal/session_test.go b/internal/session_test.go index 3d0b44fac..0f55a4560 100644 --- a/internal/session_test.go +++ b/internal/session_test.go @@ -617,7 +617,6 @@ func (s *SessionTestSuite) TestActivityRetryWithinSession() { InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Minute, - ExpirationInterval: time.Minute * 10, NonRetriableErrorReasons: []string{"bad-error"}, }, } diff --git a/internal/testdata/parentWF.json b/internal/testdata/parentWF.json index c9c0cc376..bc6719c0b 100644 --- a/internal/testdata/parentWF.json +++ b/internal/testdata/parentWF.json @@ -13,8 +13,8 @@ "taskList": { "name": "childWorkflowGroup" }, - "executionStartToCloseTimeoutSeconds": 60, - "taskStartToCloseTimeoutSeconds": 60, + "workflowRunTimeoutSeconds": 60, + "workflowTaskTimeoutSeconds": 60, "originalExecutionRunId": "0ea65eda-a0db-4a59-bef3-dce48e8484f8", "identity": "10773@longer-C02V60N3HTDG@", "attempt": 0, @@ -78,8 +78,8 @@ "name": "childWorkflowGroup" }, "input": null, - "executionStartToCloseTimeoutSeconds": 60, - "taskStartToCloseTimeoutSeconds": 10, + "workflowRunTimeoutSeconds": 60, + "workflowTaskTimeoutSeconds": 10, "decisionTaskCompletedEventId": 4, "workflowIdReusePolicy": "AllowDuplicateFailedOnly", "header": {} diff --git a/internal/testdata/sampleHistory.json b/internal/testdata/sampleHistory.json index feddc0336..41f3394b3 100644 --- a/internal/testdata/sampleHistory.json +++ b/internal/testdata/sampleHistory.json @@ -11,8 +11,8 @@ "taskList": { "name": "taskList1" }, - "executionStartToCloseTimeoutSeconds": 60, - "taskStartToCloseTimeoutSeconds": 60, + "workflowRunTimeoutSeconds": 60, + "workflowTaskTimeoutSeconds": 60, "identity": "temporal-cli@user-C02WC08UHTDG" } }, diff --git a/internal/workflow.go b/internal/workflow.go index 623446ec5..60a52a536 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -182,13 +182,20 @@ type ( // Optional: the parent workflow task list will be used if this is not provided. TaskList string - // ExecutionStartToCloseTimeout - The end to end timeout for the child workflow execution. - // Mandatory: no default - ExecutionStartToCloseTimeout time.Duration - - // TaskStartToCloseTimeout - The decision task timeout for the child workflow. + // WorkflowExecutionTimeout - The end to end timeout for the child workflow execution including retries + // and continue as new. + // Optional: defaults to 10 years + WorkflowExecutionTimeout time.Duration + + // WorkflowRunTimeout - The timeout for a single run of the child workflow execution. Each retry or + // continue as new should obey this timeout. Use WorkflowExecutionTimeout to specify how long the parent + // is willing to wait for the child completion. + // Optional: defaults to WorkflowExecutionTimeout + WorkflowRunTimeout time.Duration + + // WorkflowTaskTimeout - The workflow task timeout for the child workflow. // Optional: default is 10s if this is not provided (or if 0 is provided). - TaskStartToCloseTimeout time.Duration + WorkflowTaskTimeout time.Duration // WaitForCancellation - Whether to wait for cancelled child workflow to be ended (child workflow can be ended // as: completed/failed/timedout/terminated/canceled) @@ -482,7 +489,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteActivity(ctx Context, typeName // ScheduleToCloseTimeout: 5 * time.Second, // } // ctx := WithLocalActivityOptions(ctx, lao) -// The timeout here should be relative shorter than the DecisionTaskStartToCloseTimeout of the workflow. If you need a +// The timeout here should be relative shorter than the WorkflowTaskTimeout of the workflow. If you need a // longer timeout, you probably should not use local activity and instead should use regular activity. Local activity is // designed to be used for short living activities (usually finishes within seconds). // @@ -604,8 +611,8 @@ func (wc *workflowEnvironmentInterceptor) scheduleLocalActivity(ctx Context, par // For example: task list that this child workflow should be routed, timeouts that need to be configured. // Use ChildWorkflowOptions to pass down the options. // cwo := ChildWorkflowOptions{ -// ExecutionStartToCloseTimeout: 10 * time.Minute, -// TaskStartToCloseTimeout: time.Minute, +// WorkflowExecutionTimeout: 10 * time.Minute, +// WorkflowTaskTimeout: time.Minute, // } // ctx := WithChildWorkflowOptions(ctx, cwo) // Input childWorkflow is either a workflow name or a workflow function that is getting scheduled. @@ -706,21 +713,22 @@ func getWorkflowHeader(ctx Context, ctxProps []ContextPropagator) *commonpb.Head // WorkflowInfo information about currently executing workflow type WorkflowInfo struct { - WorkflowExecution WorkflowExecution - WorkflowType WorkflowType - TaskListName string - ExecutionStartToCloseTimeoutSeconds int32 - TaskStartToCloseTimeoutSeconds int32 - Namespace string - Attempt int32 // Attempt starts from 0 and increased by 1 for every retry if retry policy is specified. - lastCompletionResult *commonpb.Payload - CronSchedule string - ContinuedExecutionRunID string - ParentWorkflowNamespace string - ParentWorkflowExecution *WorkflowExecution - Memo *commonpb.Memo // Value can be decoded using data converter (DefaultDataConverter, or custom one if set). - SearchAttributes *commonpb.SearchAttributes // Value can be decoded using DefaultDataConverter. - BinaryChecksum string + WorkflowExecution WorkflowExecution + WorkflowType WorkflowType + TaskListName string + WorkflowExecutionTimeoutSeconds int32 + WorkflowRunTimeoutSeconds int32 + WorkflowTaskTimeoutSeconds int32 + Namespace string + Attempt int32 // Attempt starts from 0 and increased by 1 for every retry if retry policy is specified. + lastCompletionResult *commonpb.Payload + CronSchedule string + ContinuedExecutionRunID string + ParentWorkflowNamespace string + ParentWorkflowExecution *WorkflowExecution + Memo *commonpb.Memo // Value can be decoded using data converter (DefaultDataConverter, or custom one if set). + SearchAttributes *commonpb.SearchAttributes // Value can be decoded using DefaultDataConverter. + BinaryChecksum string } // GetWorkflowInfo extracts info of a current workflow from a context. @@ -966,8 +974,9 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context { wfOptions.taskListName = cwo.TaskList } wfOptions.workflowID = cwo.WorkflowID - wfOptions.executionStartToCloseTimeoutSeconds = common.Int32Ceil(cwo.ExecutionStartToCloseTimeout.Seconds()) - wfOptions.taskStartToCloseTimeoutSeconds = common.Int32Ceil(cwo.TaskStartToCloseTimeout.Seconds()) + wfOptions.workflowExecutionTimeoutSeconds = common.Int32Ceil(cwo.WorkflowExecutionTimeout.Seconds()) + wfOptions.workflowRunTimeoutSeconds = common.Int32Ceil(cwo.WorkflowRunTimeout.Seconds()) + wfOptions.workflowTaskTimeoutSeconds = common.Int32Ceil(cwo.WorkflowTaskTimeout.Seconds()) wfOptions.waitForCancellation = cwo.WaitForCancellation wfOptions.workflowIDReusePolicy = cwo.WorkflowIDReusePolicy wfOptions.retryPolicy = convertRetryPolicy(cwo.RetryPolicy) @@ -1003,21 +1012,21 @@ func WithWorkflowID(ctx Context, workflowID string) Context { return ctx1 } -// WithExecutionStartToCloseTimeout adds a workflow execution timeout to the context. +// WithWorkflowRunTimeout adds a run timeout to the context. // The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is // subjected to change in the future. -func WithExecutionStartToCloseTimeout(ctx Context, d time.Duration) Context { +func WithWorkflowRunTimeout(ctx Context, d time.Duration) Context { ctx1 := setWorkflowEnvOptionsIfNotExist(ctx) - getWorkflowEnvOptions(ctx1).executionStartToCloseTimeoutSeconds = common.Int32Ceil(d.Seconds()) + getWorkflowEnvOptions(ctx1).workflowRunTimeoutSeconds = common.Int32Ceil(d.Seconds()) return ctx1 } -// WithWorkflowTaskStartToCloseTimeout adds a decision timeout to the context. +// WithWorkflowTaskTimeout adds a workflow task timeout to the context. // The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is // subjected to change in the future. -func WithWorkflowTaskStartToCloseTimeout(ctx Context, d time.Duration) Context { +func WithWorkflowTaskTimeout(ctx Context, d time.Duration) Context { ctx1 := setWorkflowEnvOptionsIfNotExist(ctx) - getWorkflowEnvOptions(ctx1).taskStartToCloseTimeoutSeconds = common.Int32Ceil(d.Seconds()) + getWorkflowEnvOptions(ctx1).workflowTaskTimeoutSeconds = common.Int32Ceil(d.Seconds()) return ctx1 } @@ -1361,6 +1370,7 @@ func WithLocalActivityOptions(ctx Context, options LocalActivityOptions) Context opts := getLocalActivityOptions(ctx1) opts.ScheduleToCloseTimeoutSeconds = common.Int32Ceil(options.ScheduleToCloseTimeout.Seconds()) + opts.StartToCloseTimeoutSeconds = common.Int32Ceil(options.StartToCloseTimeout.Seconds()) opts.RetryPolicy = options.RetryPolicy return ctx1 } @@ -1430,11 +1440,10 @@ func convertRetryPolicy(retryPolicy *RetryPolicy) *commonpb.RetryPolicy { retryPolicy.BackoffCoefficient = backoff.DefaultBackoffCoefficient } return &commonpb.RetryPolicy{ - MaximumIntervalInSeconds: common.Int32Ceil(retryPolicy.MaximumInterval.Seconds()), - InitialIntervalInSeconds: common.Int32Ceil(retryPolicy.InitialInterval.Seconds()), - BackoffCoefficient: retryPolicy.BackoffCoefficient, - MaximumAttempts: retryPolicy.MaximumAttempts, - NonRetriableErrorReasons: retryPolicy.NonRetriableErrorReasons, - ExpirationIntervalInSeconds: common.Int32Ceil(retryPolicy.ExpirationInterval.Seconds()), + MaximumIntervalInSeconds: common.Int32Ceil(retryPolicy.MaximumInterval.Seconds()), + InitialIntervalInSeconds: common.Int32Ceil(retryPolicy.InitialInterval.Seconds()), + BackoffCoefficient: retryPolicy.BackoffCoefficient, + MaximumAttempts: retryPolicy.MaximumAttempts, + NonRetriableErrorReasons: retryPolicy.NonRetriableErrorReasons, } } diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 591237507..3d32ee891 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -529,12 +529,12 @@ func (t *TestWorkflowEnvironment) SetTestTimeout(idleTimeout time.Duration) *Tes return t } -// SetWorkflowTimeout sets the execution timeout for this tested workflow. This test framework uses mock clock internally -// and when workflow is blocked on timer, it will auto forward the mock clock. Use SetWorkflowTimeout() to enforce a -// workflow execution timeout to return timeout error when the workflow mock clock is moved head of the timeout. +// SetWorkflowRunTimeout sets the run timeout for this tested workflow. This test framework uses mock clock internally +// and when workflow is blocked on timer, it will auto forward the mock clock. Use SetWorkflowRunTimeout() to enforce a +// workflow run timeout to return timeout error when the workflow mock clock is moved head of the timeout. // This is based on the workflow time (a.k.a workflow.Now() time). -func (t *TestWorkflowEnvironment) SetWorkflowTimeout(executionTimeout time.Duration) *TestWorkflowEnvironment { - t.impl.executionTimeout = executionTimeout +func (t *TestWorkflowEnvironment) SetWorkflowRunTimeout(runTimeout time.Duration) *TestWorkflowEnvironment { + t.impl.runTimeout = runTimeout return t } diff --git a/test/fixtures/activity.cancel.sm.repro.json b/test/fixtures/activity.cancel.sm.repro.json index da09b37ed..e76d505c6 100755 --- a/test/fixtures/activity.cancel.sm.repro.json +++ b/test/fixtures/activity.cancel.sm.repro.json @@ -12,8 +12,8 @@ "taskList": { "name": "tl-1" }, - "executionStartToCloseTimeoutSeconds": 10, - "taskStartToCloseTimeoutSeconds": 1, + "workflowRunTimeoutSeconds": 10, + "workflowTaskTimeoutSeconds": 1, "identity": "97228@samar-C02XG22GJGH6@" } }, diff --git a/test/integration_test.go b/test/integration_test.go index 52d389927..e2af94beb 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -263,7 +263,7 @@ func (ts *IntegrationTestSuite) TestConsistentQuery() { // this workflow will start a local activity which blocks for long enough // to ensure that consistent query must wait in order to satisfy consistency wfOpts := ts.startWorkflowOptions("test-consistent-query") - wfOpts.DecisionTaskStartToCloseTimeout = 5 * time.Second + wfOpts.WorkflowTaskTimeout = 5 * time.Second run, err := ts.client.ExecuteWorkflow(ctx, wfOpts, ts.workflows.ConsistentQueryWorkflow, 3*time.Second) ts.Nil(err) // Wait for a second to ensure that first decision task gets started and completed before we send signal. @@ -511,11 +511,11 @@ func (ts *IntegrationTestSuite) executeWorkflowWithOption( func (ts *IntegrationTestSuite) startWorkflowOptions(wfID string) client.StartWorkflowOptions { return client.StartWorkflowOptions{ - ID: wfID, - TaskList: ts.taskListName, - ExecutionStartToCloseTimeout: 15 * time.Second, - DecisionTaskStartToCloseTimeout: time.Second, - WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate, + ID: wfID, + TaskList: ts.taskListName, + WorkflowExecutionTimeout: 15 * time.Second, + WorkflowTaskTimeout: time.Second, + WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate, } } diff --git a/test/replaytests/basic.json b/test/replaytests/basic.json index cc3cc8dc3..fbe149183 100644 --- a/test/replaytests/basic.json +++ b/test/replaytests/basic.json @@ -14,8 +14,8 @@ "name": "helloWorldGroup" }, "input": null, - "executionStartToCloseTimeoutSeconds": 60, - "taskStartToCloseTimeoutSeconds": 60, + "workflowRunTimeoutSeconds": 60, + "workflowTaskTimeoutSeconds": 60, "originalExecutionRunId": "b08ce5f0-920b-43f8-a90d-44d12476297e", "identity": "87832@boweixu-C02V61JZHTDG@", "firstExecutionRunId": "b08ce5f0-920b-43f8-a90d-44d12476297e", diff --git a/test/replaytests/basic_new.json b/test/replaytests/basic_new.json index 9e2d8727b..8932b10a4 100644 --- a/test/replaytests/basic_new.json +++ b/test/replaytests/basic_new.json @@ -14,8 +14,8 @@ "name": "helloWorldGroup" }, "input": null, - "executionStartToCloseTimeoutSeconds": 60, - "taskStartToCloseTimeoutSeconds": 60, + "workflowRunTimeoutSeconds": 60, + "workflowTaskTimeoutSeconds": 60, "originalExecutionRunId": "4ee79b48-c947-4cf2-9902-e9ab01596c6d", "identity": "2314@boweixu-C02V61JZHTDG@", "firstExecutionRunId": "4ee79b48-c947-4cf2-9902-e9ab01596c6d", diff --git a/test/replaytests/version.json b/test/replaytests/version.json index 4066f3acb..de5d0bb6b 100644 --- a/test/replaytests/version.json +++ b/test/replaytests/version.json @@ -14,8 +14,8 @@ "name": "helloWorldGroup" }, "input": null, - "executionStartToCloseTimeoutSeconds": 60, - "taskStartToCloseTimeoutSeconds": 60, + "workflowRunTimeoutSeconds": 60, + "workflowTaskTimeoutSeconds": 60, "originalExecutionRunId": "d27163e6-9d88-4fbf-8c68-dc5b0b9dceda", "identity": "53892@boweixu-C02V61JZHTDG@", "firstExecutionRunId": "d27163e6-9d88-4fbf-8c68-dc5b0b9dceda", diff --git a/test/replaytests/version_new.json b/test/replaytests/version_new.json index 0853bf511..baaa67b48 100644 --- a/test/replaytests/version_new.json +++ b/test/replaytests/version_new.json @@ -14,8 +14,8 @@ "name": "helloWorldGroup" }, "input": null, - "executionStartToCloseTimeoutSeconds": 60, - "taskStartToCloseTimeoutSeconds": 60, + "workflowRunTimeoutSeconds": 60, + "workflowTaskTimeoutSeconds": 60, "originalExecutionRunId": "49ca7b7a-4eb7-434a-bc59-7c71b5eea75e", "identity": "69593@boweixu-C02V61JZHTDG@", "firstExecutionRunId": "49ca7b7a-4eb7-434a-bc59-7c71b5eea75e", diff --git a/test/workflow_test.go b/test/workflow_test.go index e909a819b..036dc6e26 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -226,10 +226,10 @@ func (w *Workflows) IDReusePolicy( failFirstChild bool) (string, error) { ctx = workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ - WorkflowID: childWFID, - ExecutionStartToCloseTimeout: 9 * time.Second, - TaskStartToCloseTimeout: 5 * time.Second, - WorkflowIDReusePolicy: policy, + WorkflowID: childWFID, + WorkflowExecutionTimeout: 9 * time.Second, + WorkflowTaskTimeout: 5 * time.Second, + WorkflowIDReusePolicy: policy, }) var ans1 string @@ -264,13 +264,12 @@ func (w *Workflows) IDReusePolicy( func (w *Workflows) ChildWorkflowRetryOnError(ctx workflow.Context) error { opts := workflow.ChildWorkflowOptions{ - TaskStartToCloseTimeout: 5 * time.Second, - ExecutionStartToCloseTimeout: 9 * time.Second, + WorkflowTaskTimeout: 5 * time.Second, + WorkflowExecutionTimeout: 9 * time.Second, RetryPolicy: &temporal.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Second, - ExpirationInterval: 100 * time.Second, MaximumAttempts: 3, }, } @@ -281,13 +280,12 @@ func (w *Workflows) ChildWorkflowRetryOnError(ctx workflow.Context) error { func (w *Workflows) ChildWorkflowRetryOnTimeout(ctx workflow.Context) error { opts := workflow.ChildWorkflowOptions{ - TaskStartToCloseTimeout: time.Second, - ExecutionStartToCloseTimeout: time.Second, + WorkflowTaskTimeout: time.Second, + WorkflowExecutionTimeout: time.Second, RetryPolicy: &temporal.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Second, - ExpirationInterval: 100 * time.Second, MaximumAttempts: 3, }, } @@ -297,10 +295,10 @@ func (w *Workflows) ChildWorkflowRetryOnTimeout(ctx workflow.Context) error { func (w *Workflows) ChildWorkflowSuccess(ctx workflow.Context) (result string, err error) { opts := workflow.ChildWorkflowOptions{ - TaskStartToCloseTimeout: 5 * time.Second, - ExecutionStartToCloseTimeout: 10 * time.Second, - Memo: map[string]interface{}{"memoKey": "memoVal"}, - SearchAttributes: map[string]interface{}{"CustomKeywordField": "searchAttrVal"}, + WorkflowTaskTimeout: 5 * time.Second, + WorkflowExecutionTimeout: 10 * time.Second, + Memo: map[string]interface{}{"memoKey": "memoVal"}, + SearchAttributes: map[string]interface{}{"CustomKeywordField": "searchAttrVal"}, } ctx = workflow.WithChildOptions(ctx, opts) err = workflow.ExecuteChildWorkflow(ctx, w.childForMemoAndSearchAttr).Get(ctx, &result) @@ -309,8 +307,8 @@ func (w *Workflows) ChildWorkflowSuccess(ctx workflow.Context) (result string, e func (w *Workflows) ChildWorkflowSuccessWithParentClosePolicyTerminate(ctx workflow.Context) (result string, err error) { opts := workflow.ChildWorkflowOptions{ - TaskStartToCloseTimeout: 5 * time.Second, - ExecutionStartToCloseTimeout: 30 * time.Second, + WorkflowTaskTimeout: 5 * time.Second, + WorkflowExecutionTimeout: 30 * time.Second, } ctx = workflow.WithChildOptions(ctx, opts) ft := workflow.ExecuteChildWorkflow(ctx, w.sleep, 20*time.Second) @@ -321,9 +319,9 @@ func (w *Workflows) ChildWorkflowSuccessWithParentClosePolicyTerminate(ctx workf func (w *Workflows) ChildWorkflowSuccessWithParentClosePolicyAbandon(ctx workflow.Context) (result string, err error) { opts := workflow.ChildWorkflowOptions{ - TaskStartToCloseTimeout: 5 * time.Second, - ExecutionStartToCloseTimeout: 10 * time.Second, - ParentClosePolicy: client.ParentClosePolicyAbandon, + WorkflowTaskTimeout: 5 * time.Second, + WorkflowExecutionTimeout: 10 * time.Second, + ParentClosePolicy: client.ParentClosePolicyAbandon, } ctx = workflow.WithChildOptions(ctx, opts) ft := workflow.ExecuteChildWorkflow(ctx, w.sleep, 5*time.Second) @@ -441,7 +439,6 @@ func (w *Workflows) RetryTimeoutStableErrorWorkflow(ctx workflow.Context) ([]str InitialInterval: time.Second, BackoffCoefficient: 1.0, MaximumInterval: time.Second, - ExpirationInterval: time.Second * 5, }, } ctx = workflow.WithActivityOptions(ctx, ao) @@ -565,7 +562,6 @@ func (w *Workflows) defaultActivityOptionsWithRetry() workflow.ActivityOptions { InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Second, - ExpirationInterval: 100 * time.Second, MaximumAttempts: 3, }, } diff --git a/workflow/doc.go b/workflow/doc.go index 8fcf1229f..ddb24af7e 100644 --- a/workflow/doc.go +++ b/workflow/doc.go @@ -226,7 +226,7 @@ for an activity it invoked. cwo := workflow.ChildWorkflowOptions{ // Do not specify WorkflowID if you want temporal to generate a unique ID for child execution WorkflowID: "BID-SIMPLE-CHILD-WORKFLOW", - ExecutionStartToCloseTimeout: time.Minute * 30, + WorkflowExecutionTimeout: time.Minute * 30, } ctx = workflow.WithChildOptions(ctx, cwo) diff --git a/workflow/error.go b/workflow/error.go index 7ba4c9223..bf5955fb9 100644 --- a/workflow/error.go +++ b/workflow/error.go @@ -116,8 +116,8 @@ type ( // provided to this function. // ctx - use context to override any options for the new workflow like execution timeout, decision task timeout, task list. // if not mentioned it would use the defaults that the current workflow is using. -// ctx := WithExecutionStartToCloseTimeout(ctx, 30 * time.Minute) -// ctx := WithWorkflowTaskStartToCloseTimeout(ctx, time.Minute) +// ctx := WithWorkflowExecutionTimeout(ctx, 30 * time.Minute) +// ctx := WithWorkflowTaskTimeout(ctx, time.Minute) // ctx := WithWorkflowTaskList(ctx, "example-group") // wfn - workflow function. for new execution it can be different from the currently running. // args - arguments for the new workflow. diff --git a/workflow/workflow.go b/workflow/workflow.go index b2f58958e..951ba86fd 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -104,7 +104,7 @@ func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Fut // ScheduleToCloseTimeout: 5 * time.Second, // } // ctx := WithLocalActivityOptions(ctx, lao) -// The timeout here should be relative shorter than the DecisionTaskStartToCloseTimeout of the workflow. If you need a +// The timeout here should be relative shorter than the WorkflowTaskTimeout of the workflow. If you need a // longer timeout, you probably should not use local activity and instead should use regular activity. Local activity is // designed to be used for short living activities (usually finishes within seconds). // @@ -129,8 +129,8 @@ func ExecuteLocalActivity(ctx Context, activity interface{}, args ...interface{} // For example: task list that this child workflow should be routed, timeouts that need to be configured. // Use ChildWorkflowOptions to pass down the options. // cwo := ChildWorkflowOptions{ -// ExecutionStartToCloseTimeout: 10 * time.Minute, -// TaskStartToCloseTimeout: time.Minute, +// WorkflowExecutionTimeout: 10 * time.Minute, +// WorkflowTaskTimeout: time.Minute, // } // ctx := WithChildOptions(ctx, cwo) // Input childWorkflow is either a workflow name or a workflow function that is getting scheduled. diff --git a/workflow/workflow_options.go b/workflow/workflow_options.go index 0cbe4c392..a866449ff 100644 --- a/workflow/workflow_options.go +++ b/workflow/workflow_options.go @@ -51,14 +51,18 @@ func WithWorkflowID(ctx Context, workflowID string) Context { return internal.WithWorkflowID(ctx, workflowID) } -// WithExecutionStartToCloseTimeout adds a workflow execution timeout to the context. -func WithExecutionStartToCloseTimeout(ctx Context, d time.Duration) Context { - return internal.WithExecutionStartToCloseTimeout(ctx, d) +// WithWorkflowRunTimeout adds a run timeout to the context. +// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is +// subjected to change in the future. +func WithWorkflowRunTimeout(ctx Context, d time.Duration) Context { + return internal.WithWorkflowRunTimeout(ctx, d) } -// WithWorkflowTaskStartToCloseTimeout adds a decision timeout to the context. -func WithWorkflowTaskStartToCloseTimeout(ctx Context, d time.Duration) Context { - return internal.WithWorkflowTaskStartToCloseTimeout(ctx, d) +// WithWorkflowTaskTimeout adds a workflow task timeout to the context. +// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is +// subjected to change in the future. +func WithWorkflowTaskTimeout(ctx Context, d time.Duration) Context { + return internal.WithWorkflowTaskTimeout(ctx, d) } // WithDataConverter adds DataConverter to the context.