Skip to content

Commit

Permalink
Retry policy and workflow timeouts refactoring (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev committed Apr 29, 2020
1 parent e6d4c27 commit 3284084
Show file tree
Hide file tree
Showing 38 changed files with 540 additions and 471 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
go.temporal.io/temporal-proto v0.20.28
go.temporal.io/temporal-proto v0.20.29
go.uber.org/atomic v1.6.0
go.uber.org/goleak v1.0.0
go.uber.org/zap v1.14.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.temporal.io/temporal-proto v0.20.28 h1:syCkMj1bBEqPCj4dKmPQksBKH3qXeZ+PayfsBMdQEOY=
go.temporal.io/temporal-proto v0.20.28/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.temporal.io/temporal-proto v0.20.29 h1:YDKcU0qxThs9ihny93Pf2/gSdGvybKjKuCIosy54nQ8=
go.temporal.io/temporal-proto v0.20.29/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
Expand Down
8 changes: 6 additions & 2 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,16 @@ type (

// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
LocalActivityOptions struct {
// ScheduleToCloseTimeout - The end to end timeout for the local activity.
// ScheduleToCloseTimeout - The end to end timeout for the local activity including retries.
// This field is required.
ScheduleToCloseTimeout time.Duration

// StartToCloseTimeout - The timeout for a single execution of the local activity.
// Optional: defaults to ScheduleToClose
StartToCloseTimeout time.Duration

// RetryPolicy specify how to retry activity if error happens.
// Optional: default is no retry
// Optional: default is to retry according to the default retry policy up to ScheduleToCloseTimeout
RetryPolicy *RetryPolicy
}
)
Expand Down
24 changes: 13 additions & 11 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,16 +404,23 @@ type (
// Mandatory: No default.
TaskList string

// ExecutionStartToCloseTimeout - The timeout for duration of workflow execution.
// WorkflowExecutionTimeout - The timeout for duration of workflow execution.
// It includes retries and continue as new. Use WorkflowRunTimeout to limit execution time
// of a single workflow run.
// The resolution is seconds.
// Mandatory: No default.
ExecutionStartToCloseTimeout time.Duration
// Optional: defaulted to 10 years.
WorkflowExecutionTimeout time.Duration

// WorkflowRunTimeout - The timeout for duration of a single workflow run.
// The resolution is seconds.
// Optional: defaulted to WorkflowExecutionTimeout.
WorkflowRunTimeout time.Duration

// DecisionTaskStartToCloseTimeout - The timeout for processing decision task from the time the worker
// WorkflowTaskTimeout - The timeout for processing workflow task from the time the worker
// pulled this task. If a decision task is lost, it is retried after this timeout.
// The resolution is seconds.
// Optional: defaulted to 10 secs.
DecisionTaskStartToCloseTimeout time.Duration
WorkflowTaskTimeout time.Duration

// WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful
// for dedup logic if set to WorkflowIdReusePolicyRejectDuplicate.
Expand Down Expand Up @@ -470,13 +477,8 @@ type (
// This value is the cap of the interval. Default is 100x of initial interval.
MaximumInterval time.Duration

// Maximum time to retry. Either ExpirationInterval or MaximumAttempts is required.
// When exceeded the retries stop even if maximum retries is not reached yet.
ExpirationInterval time.Duration

// Maximum number of attempts. When exceeded the retries stop even if not expired yet.
// If not set or set to 0, it means unlimited, and rely on ExpirationInterval to stop.
// Either MaximumAttempts or ExpirationInterval is required.
// If not set or set to 0, it means unlimited, and rely on activity ScheduleToCloseTimeout to stop.
MaximumAttempts int32

// Non-Retriable errors. This is optional. Temporal server will stop retry if error reason matches this list.
Expand Down
40 changes: 40 additions & 0 deletions internal/common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,43 @@ func Int32Ceil(v float64) int32 {
func Int64Ceil(v float64) int64 {
return int64(math.Ceil(v))
}

// MinInt64 returns the smaller of two given int64
func MinInt64(a, b int64) int64 {
if a < b {
return a
}
return b
}

// MaxInt64 returns the greater of two given int64
func MaxInt64(a, b int64) int64 {
if a > b {
return a
}
return b
}

// MinInt32 return smaller one of two inputs int32
func MinInt32(a, b int32) int32 {
if a < b {
return a
}
return b
}

// MinInt returns the smaller of two given integers
func MinInt(a, b int) int {
if a < b {
return a
}
return b
}

// MaxInt returns the greater one of two given integers
func MaxInt(a, b int) int {
if a > b {
return a
}
return b
}
15 changes: 3 additions & 12 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ func IsCanceledError(err error) bool {
// If the workflow main function returns this error then the current execution is ended and
// the new execution with same workflow ID is started automatically with options
// provided to this function.
// ctx - use context to override any options for the new workflow like execution timeout, decision task timeout, task list.
// ctx - use context to override any options for the new workflow like run timeout, task timeout, task list.
// if not mentioned it would use the defaults that the current workflow is using.
// ctx := WithExecutionStartToCloseTimeout(ctx, 30 * time.Minute)
// ctx := WithWorkflowTaskStartToCloseTimeout(ctx, time.Minute)
// ctx := WithWorkflowRunTimeout(ctx, 30 * time.Minute)
// ctx := WithWorkflowTaskTimeout(ctx, 5 * time.Second)
// ctx := WithWorkflowTaskList(ctx, "example-group")
// wfn - workflow function. for new execution it can be different from the currently running.
// args - arguments for the new workflow.
Expand All @@ -228,15 +228,6 @@ func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *C
if err != nil {
panic(err)
}
if options.taskListName == "" {
panic("invalid task list provided")
}
if options.executionStartToCloseTimeoutSeconds <= 0 {
panic("invalid executionStartToCloseTimeoutSeconds provided")
}
if options.taskStartToCloseTimeoutSeconds <= 0 {
panic("invalid taskStartToCloseTimeoutSeconds provided")
}

params := &executeWorkflowParams{
workflowOptions: *options,
Expand Down
17 changes: 14 additions & 3 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type (

localActivityOptions struct {
ScheduleToCloseTimeoutSeconds int32
StartToCloseTimeoutSeconds int32
RetryPolicy *RetryPolicy
}

Expand Down Expand Up @@ -175,10 +176,20 @@ func getValidatedLocalActivityOptions(ctx Context) (*localActivityOptions, error
if p == nil {
return nil, errLocalActivityParamsBadRequest
}
if p.ScheduleToCloseTimeoutSeconds <= 0 {
return nil, errors.New("missing or negative ScheduleToCloseTimeoutSeconds")
if p.ScheduleToCloseTimeoutSeconds < 0 {
return nil, errors.New("negative ScheduleToCloseTimeoutSeconds")
}
if p.StartToCloseTimeoutSeconds < 0 {
return nil, errors.New("negative StartToCloseTimeoutSeconds")
}
if p.ScheduleToCloseTimeoutSeconds == 0 && p.StartToCloseTimeoutSeconds == 0 {
return nil, errors.New("at least one of ScheduleToCloseTimeoutSeconds and StartToCloseTimeoutSeconds is required")
}
if p.ScheduleToCloseTimeoutSeconds == 0 {
p.ScheduleToCloseTimeoutSeconds = p.StartToCloseTimeoutSeconds
} else {
p.StartToCloseTimeoutSeconds = p.ScheduleToCloseTimeoutSeconds
}

return p, nil
}

Expand Down
9 changes: 5 additions & 4 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
attributes.Namespace = params.namespace
attributes.TaskList = &tasklistpb.TaskList{Name: params.taskListName}
attributes.WorkflowId = params.workflowID
attributes.ExecutionStartToCloseTimeoutSeconds = params.executionStartToCloseTimeoutSeconds
attributes.TaskStartToCloseTimeoutSeconds = params.taskStartToCloseTimeoutSeconds
attributes.WorkflowExecutionTimeoutSeconds = params.workflowExecutionTimeoutSeconds
attributes.WorkflowRunTimeoutSeconds = params.workflowRunTimeoutSeconds
attributes.WorkflowTaskTimeoutSeconds = params.workflowTaskTimeoutSeconds
attributes.Input = params.input
attributes.WorkflowType = &commonpb.WorkflowType{Name: params.workflowType.Name}
attributes.WorkflowIdReusePolicy = params.workflowIDReusePolicy.toProto()
Expand Down Expand Up @@ -506,8 +507,8 @@ func newLocalActivityTask(params executeLocalActivityParams, callback laResultHa
attempt: params.Attempt,
}

if params.RetryPolicy != nil && params.RetryPolicy.ExpirationInterval > 0 {
task.expireTime = params.ScheduledTime.Add(params.RetryPolicy.ExpirationInterval)
if params.ScheduleToCloseTimeoutSeconds > 0 {
task.expireTime = params.ScheduledTime.Add(time.Second * time.Duration(params.ScheduleToCloseTimeoutSeconds))
}
return task
}
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type (
// - RespondQueryTaskCompletedRequest
// If waitLocalActivity is true, and there is outstanding local activities, this call will return nil.
CompleteDecisionTask(workflowTask *workflowTask, waitLocalActivity bool) interface{}
// GetDecisionTimeout returns the TaskStartToCloseTimeout
// GetWorkflowTaskTimeout returns the WorkflowTaskTimeout
GetDecisionTimeout() time.Duration
GetCurrentDecisionTask() *workflowservice.PollForDecisionTaskResponse
IsDestroyed() bool
Expand Down
56 changes: 27 additions & 29 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,19 +617,20 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
ID: workflowID,
RunID: runID,
},
WorkflowType: WorkflowType{Name: task.WorkflowType.GetName()},
TaskListName: taskList.GetName(),
ExecutionStartToCloseTimeoutSeconds: attributes.GetExecutionStartToCloseTimeoutSeconds(),
TaskStartToCloseTimeoutSeconds: attributes.GetTaskStartToCloseTimeoutSeconds(),
Namespace: wth.namespace,
Attempt: attributes.GetAttempt(),
lastCompletionResult: attributes.LastCompletionResult,
CronSchedule: attributes.CronSchedule,
ContinuedExecutionRunID: attributes.ContinuedExecutionRunId,
ParentWorkflowNamespace: attributes.ParentWorkflowNamespace,
ParentWorkflowExecution: parentWorkflowExecution,
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
WorkflowType: WorkflowType{Name: task.WorkflowType.GetName()},
TaskListName: taskList.GetName(),
WorkflowExecutionTimeoutSeconds: attributes.GetWorkflowExecutionTimeoutSeconds(),
WorkflowRunTimeoutSeconds: attributes.GetWorkflowRunTimeoutSeconds(),
WorkflowTaskTimeoutSeconds: attributes.GetWorkflowTaskTimeoutSeconds(),
Namespace: wth.namespace,
Attempt: attributes.GetAttempt(),
lastCompletionResult: attributes.LastCompletionResult,
CronSchedule: attributes.CronSchedule,
ContinuedExecutionRunID: attributes.ContinuedExecutionRunId,
ParentWorkflowNamespace: attributes.ParentWorkflowNamespace,
ParentWorkflowExecution: parentWorkflowExecution,
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
}

wfStartTime := time.Unix(0, h.Events[0].GetTimestamp())
Expand Down Expand Up @@ -770,7 +771,8 @@ processWorkflowLoop:
if err == nil && response == nil {
waitLocalActivityLoop:
for {
deadlineToTrigger := time.Duration(float32(ratioToForceCompleteDecisionTaskComplete) * float32(workflowContext.GetDecisionTimeout()))
deadlineToTrigger := time.Duration(float32(ratioToForceCompleteDecisionTaskComplete) *
float32(workflowContext.GetWorkflowTaskTimeout()))
delayDuration := time.Until(startTime.Add(deadlineToTrigger))
select {
case <-time.After(delayDuration):
Expand Down Expand Up @@ -963,7 +965,7 @@ func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResu
}

retryBackoff := getRetryBackoff(lar, time.Now(), w.wth.dataConverter)
if retryBackoff > 0 && retryBackoff <= w.GetDecisionTimeout() {
if retryBackoff > 0 && retryBackoff <= w.GetWorkflowTaskTimeout() {
// we need a local retry
time.AfterFunc(retryBackoff, func() {
// TODO: this should not be a separate goroutine as it introduces race condition when accessing eventHandler.
Expand Down Expand Up @@ -1014,10 +1016,6 @@ func getRetryBackoff(lar *localActivityResult, now time.Time, dataConverter Data
}

func getRetryBackoffWithNowTime(p *RetryPolicy, attempt int32, errReason string, now, expireTime time.Time) time.Duration {
if p.MaximumAttempts == 0 && p.ExpirationInterval == 0 {
return noRetryBackoff
}

if p.MaximumAttempts > 0 && attempt > p.MaximumAttempts-1 {
return noRetryBackoff // max attempt reached
}
Expand Down Expand Up @@ -1140,8 +1138,8 @@ func (w *workflowExecutionContextImpl) ResetIfStale(task *workflowservice.PollFo
return nil
}

func (w *workflowExecutionContextImpl) GetDecisionTimeout() time.Duration {
return time.Second * time.Duration(w.workflowInfo.TaskStartToCloseTimeoutSeconds)
func (w *workflowExecutionContextImpl) GetWorkflowTaskTimeout() time.Duration {
return time.Second * time.Duration(w.workflowInfo.WorkflowTaskTimeoutSeconds)
}

func skipDeterministicCheckForDecision(d *decisionpb.Decision) bool {
Expand Down Expand Up @@ -1490,14 +1488,14 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
metricsScope.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1)
closeDecision = createNewDecision(decisionpb.DecisionType_ContinueAsNewWorkflowExecution)
closeDecision.Attributes = &decisionpb.Decision_ContinueAsNewWorkflowExecutionDecisionAttributes{ContinueAsNewWorkflowExecutionDecisionAttributes: &decisionpb.ContinueAsNewWorkflowExecutionDecisionAttributes{
WorkflowType: &commonpb.WorkflowType{Name: contErr.params.workflowType.Name},
Input: contErr.params.input,
TaskList: &tasklistpb.TaskList{Name: contErr.params.taskListName},
ExecutionStartToCloseTimeoutSeconds: contErr.params.executionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: contErr.params.taskStartToCloseTimeoutSeconds,
Header: contErr.params.header,
Memo: workflowContext.workflowInfo.Memo,
SearchAttributes: workflowContext.workflowInfo.SearchAttributes,
WorkflowType: &commonpb.WorkflowType{Name: contErr.params.workflowType.Name},
Input: contErr.params.input,
TaskList: &tasklistpb.TaskList{Name: contErr.params.taskListName},
WorkflowRunTimeoutSeconds: contErr.params.workflowRunTimeoutSeconds,
WorkflowTaskTimeoutSeconds: contErr.params.workflowTaskTimeoutSeconds,
Header: contErr.params.header,
Memo: workflowContext.workflowInfo.Memo,
SearchAttributes: workflowContext.workflowInfo.SearchAttributes,
}}
} else if workflowContext.err != nil {
// Workflow failures
Expand Down
32 changes: 17 additions & 15 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,21 +880,23 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() {
parentNamespace := "parentNamespace"
var attempt int32 = 123
var executionTimeout int32 = 213456
var runTimeout int32 = 21098
var taskTimeout int32 = 21
workflowType := "GetWorkflowInfoWorkflow"
lastCompletionResult, err := getDefaultDataConverter().ToData("lastCompletionData")
t.NoError(err)
startedEventAttributes := &eventpb.WorkflowExecutionStartedEventAttributes{
Input: lastCompletionResult,
TaskList: &tasklistpb.TaskList{Name: taskList},
ParentWorkflowExecution: parentExecution,
CronSchedule: cronSchedule,
ContinuedExecutionRunId: continuedRunID,
ParentWorkflowNamespace: parentNamespace,
Attempt: attempt,
ExecutionStartToCloseTimeoutSeconds: executionTimeout,
TaskStartToCloseTimeoutSeconds: taskTimeout,
LastCompletionResult: lastCompletionResult,
Input: lastCompletionResult,
TaskList: &tasklistpb.TaskList{Name: taskList},
ParentWorkflowExecution: parentExecution,
CronSchedule: cronSchedule,
ContinuedExecutionRunId: continuedRunID,
ParentWorkflowNamespace: parentNamespace,
Attempt: attempt,
WorkflowExecutionTimeoutSeconds: executionTimeout,
WorkflowRunTimeoutSeconds: runTimeout,
WorkflowTaskTimeoutSeconds: taskTimeout,
LastCompletionResult: lastCompletionResult,
}
testEvents := []*eventpb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, startedEventAttributes),
Expand Down Expand Up @@ -927,8 +929,9 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() {
t.EqualValues(continuedRunID, result.ContinuedExecutionRunID)
t.EqualValues(parentNamespace, result.ParentWorkflowNamespace)
t.EqualValues(attempt, result.Attempt)
t.EqualValues(executionTimeout, result.ExecutionStartToCloseTimeoutSeconds)
t.EqualValues(taskTimeout, result.TaskStartToCloseTimeoutSeconds)
t.EqualValues(executionTimeout, result.WorkflowExecutionTimeoutSeconds)
t.EqualValues(runTimeout, result.WorkflowRunTimeoutSeconds)
t.EqualValues(taskTimeout, result.WorkflowTaskTimeoutSeconds)
t.EqualValues(workflowType, result.WorkflowType.Name)
t.EqualValues(testNamespace, result.Namespace)
}
Expand Down Expand Up @@ -1115,7 +1118,6 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_DecisionHeartbeatFail() {
InitialInterval: backoffDuration,
BackoffCoefficient: 1.1,
MaximumInterval: time.Minute,
ExpirationInterval: time.Minute,
},
}
ctx = WithLocalActivityOptions(ctx, ao)
Expand All @@ -1136,8 +1138,8 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_DecisionHeartbeatFail() {
testEvents := []*eventpb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &eventpb.WorkflowExecutionStartedEventAttributes{
// make sure the timeout is same as the backoff interval
TaskStartToCloseTimeoutSeconds: backoffIntervalInSeconds,
TaskList: &tasklistpb.TaskList{Name: testWorkflowTaskTasklist}},
WorkflowTaskTimeoutSeconds: backoffIntervalInSeconds,
TaskList: &tasklistpb.TaskList{Name: testWorkflowTaskTasklist}},
),
createTestEventDecisionTaskScheduled(2, &eventpb.DecisionTaskScheduledEventAttributes{}),
decisionTaskStartedEvent,
Expand Down
6 changes: 5 additions & 1 deletion internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,11 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
}
}()

timeoutDuration := time.Duration(task.params.ScheduleToCloseTimeoutSeconds) * time.Second
timeout := task.params.ScheduleToCloseTimeoutSeconds
if task.params.StartToCloseTimeoutSeconds != 0 && task.params.StartToCloseTimeoutSeconds < timeout {
timeout = task.params.StartToCloseTimeoutSeconds
}
timeoutDuration := time.Duration(timeout) * time.Second
deadline := time.Now().Add(timeoutDuration)
if task.attempt > 0 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) {
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
Expand Down
Loading

0 comments on commit 3284084

Please sign in to comment.