Skip to content

Commit

Permalink
Sub-orchestration retries (#84)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting authored Nov 27, 2024
1 parent febd2db commit 6ab94e1
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add API to set custom status ([#81](https://github.com/microsoft/durabletask-go/pull/81)) - by [@famarting](https://github.com/famarting)
- Add missing purge orchestration options ([#82](https://github.com/microsoft/durabletask-go/pull/82)) - by [@famarting](https://github.com/famarting)
- Add support for activity retry policies ([#83](https://github.com/microsoft/durabletask-go/pull/83)) - by [@famarting](https://github.com/famarting)
- Add support for sub-orchestration retry policies ([#84](https://github.com/microsoft/durabletask-go/pull/84)) - by [@famarting](https://github.com/famarting)

### Changed

Expand Down
5 changes: 4 additions & 1 deletion backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,10 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
for _, msg := range wi.State.PendingMessages() {
if es := msg.HistoryEvent.GetExecutionStarted(); es != nil {
// Need to insert a new row into the DB
if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx); err != nil {
if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, backend.WithOrchestrationIdReusePolicy(&protos.OrchestrationIdReusePolicy{
OperationStatus: []protos.OrchestrationStatus{protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED},
Action: api.REUSE_ID_ACTION_TERMINATE,
})); err != nil {
if err == backend.ErrDuplicateEvent {
be.logger.Warnf(
"%v: dropping sub-orchestration creation event because an instance with the target ID (%v) already exists.",
Expand Down
2 changes: 1 addition & 1 deletion samples/retries/retries.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, bac
}

func RetryActivityOrchestrator(ctx *task.OrchestrationContext) (any, error) {
if err := ctx.CallActivity(RandomFailActivity, task.WithRetryPolicy(&task.ActivityRetryPolicy{
if err := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
MaxAttempts: 10,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
Expand Down
54 changes: 31 additions & 23 deletions task/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ type callActivityOption func(*callActivityOptions) error

type callActivityOptions struct {
rawInput *wrapperspb.StringValue
retryPolicy *ActivityRetryPolicy
retryPolicy *RetryPolicy
}

type ActivityRetryPolicy struct {
type RetryPolicy struct {
// Max number of attempts to try the activity call, first execution inclusive
MaxAttempts int
// Timespan to wait for the first retry
Expand All @@ -32,6 +32,31 @@ type ActivityRetryPolicy struct {
Handle func(error) bool
}

func (policy *RetryPolicy) Validate() error {
if policy.InitialRetryInterval <= 0 {
return fmt.Errorf("InitialRetryInterval must be greater than 0")
}
if policy.MaxAttempts <= 0 {
// setting 1 max attempt is equivalent to not retrying
policy.MaxAttempts = 1
}
if policy.BackoffCoefficient <= 0 {
policy.BackoffCoefficient = 1
}
if policy.MaxRetryInterval <= 0 {
policy.MaxRetryInterval = math.MaxInt64
}
if policy.RetryTimeout <= 0 {
policy.RetryTimeout = math.MaxInt64
}
if policy.Handle == nil {
policy.Handle = func(err error) bool {
return true
}
}
return nil
}

// WithActivityInput configures an input for an activity invocation.
// The specified input must be JSON serializable.
func WithActivityInput(input any) callActivityOption {
Expand All @@ -53,31 +78,14 @@ func WithRawActivityInput(input string) callActivityOption {
}
}

func WithRetryPolicy(policy *ActivityRetryPolicy) callActivityOption {
func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption {
return func(opt *callActivityOptions) error {
if policy == nil {
return nil
}
if policy.InitialRetryInterval <= 0 {
return fmt.Errorf("InitialRetryInterval must be greater than 0")
}
if policy.MaxAttempts <= 0 {
// setting 1 max attempt is equivalent to not retrying
policy.MaxAttempts = 1
}
if policy.BackoffCoefficient <= 0 {
policy.BackoffCoefficient = 1
}
if policy.MaxRetryInterval <= 0 {
policy.MaxRetryInterval = math.MaxInt64
}
if policy.RetryTimeout <= 0 {
policy.RetryTimeout = math.MaxInt64
}
if policy.Handle == nil {
policy.Handle = func(err error) bool {
return true
}
err := policy.Validate()
if err != nil {
return err
}
opt.retryPolicy = policy
return nil
Expand Down
80 changes: 53 additions & 27 deletions task/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type OrchestrationContext struct {
type callSubOrchestratorOptions struct {
instanceID string
rawInput *wrapperspb.StringValue

retryPolicy *RetryPolicy
}

// subOrchestratorOption is a functional option type for the CallSubOrchestrator orchestrator method.
Expand Down Expand Up @@ -96,6 +98,20 @@ func WithSubOrchestrationInstanceID(instanceID string) subOrchestratorOption {
}
}

func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) subOrchestratorOption {
return func(opt *callSubOrchestratorOptions) error {
if policy == nil {
return nil
}
err := policy.Validate()
if err != nil {
return err
}
opt.retryPolicy = policy
return nil
}
}

// NewOrchestrationContext returns a new [OrchestrationContext] struct with the specified parameters.
func NewOrchestrationContext(registry *TaskRegistry, id api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) *OrchestrationContext {
return &OrchestrationContext{
Expand Down Expand Up @@ -238,7 +254,7 @@ func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...call
}

if options.retryPolicy != nil {
return ctx.internalCallActivityWithRetries(ctx.CurrentTimeUtc, func() Task {
return ctx.internalScheduleTaskWithRetries(ctx.CurrentTimeUtc, func() Task {
return ctx.internalScheduleActivity(activity, options)
}, *options.retryPolicy, 0)
}
Expand All @@ -259,7 +275,40 @@ func (ctx *OrchestrationContext) internalScheduleActivity(activity interface{},
return task
}

func (ctx *OrchestrationContext) internalCallActivityWithRetries(initialAttempt time.Time, schedule func() Task, policy ActivityRetryPolicy, retryCount int) Task {
func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task {
options := new(callSubOrchestratorOptions)
for _, configure := range opts {
if err := configure(options); err != nil {
failedTask := newTask(ctx)
failedTask.fail(helpers.NewTaskFailureDetails(err))
return failedTask
}
}

if options.retryPolicy != nil {
return ctx.internalScheduleTaskWithRetries(ctx.CurrentTimeUtc, func() Task {
return ctx.internalCallSubOrchestrator(orchestrator, options)
}, *options.retryPolicy, 0)
}

return ctx.internalCallSubOrchestrator(orchestrator, options)
}

func (ctx *OrchestrationContext) internalCallSubOrchestrator(orchestrator interface{}, options *callSubOrchestratorOptions) Task {
createSubOrchestrationAction := helpers.NewCreateSubOrchestrationAction(
ctx.getNextSequenceNumber(),
helpers.GetTaskFunctionName(orchestrator),
options.instanceID,
options.rawInput,
)
ctx.pendingActions[createSubOrchestrationAction.Id] = createSubOrchestrationAction

task := newTask(ctx)
ctx.pendingTasks[createSubOrchestrationAction.Id] = task
return task
}

func (ctx *OrchestrationContext) internalScheduleTaskWithRetries(initialAttempt time.Time, schedule func() Task, policy RetryPolicy, retryCount int) Task {
return &taskWrapper{
delegate: schedule(),
onAwaitResult: func(v any, err error) error {
Expand All @@ -283,7 +332,7 @@ func (ctx *OrchestrationContext) internalCallActivityWithRetries(initialAttempt
return fmt.Errorf("%v %w", timerErr, err)
}

err = ctx.internalCallActivityWithRetries(initialAttempt, schedule, policy, retryCount+1).Await(v)
err = ctx.internalScheduleTaskWithRetries(initialAttempt, schedule, policy, retryCount+1).Await(v)
if err == nil {
return nil
}
Expand All @@ -292,7 +341,7 @@ func (ctx *OrchestrationContext) internalCallActivityWithRetries(initialAttempt
}
}

func computeNextDelay(currentTimeUtc time.Time, policy ActivityRetryPolicy, attempt int, firstAttempt time.Time, err error) time.Duration {
func computeNextDelay(currentTimeUtc time.Time, policy RetryPolicy, attempt int, firstAttempt time.Time, err error) time.Duration {
if policy.Handle(err) {
isExpired := false
if policy.RetryTimeout != math.MaxInt64 {
Expand All @@ -309,29 +358,6 @@ func computeNextDelay(currentTimeUtc time.Time, policy ActivityRetryPolicy, atte
return 0
}

func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task {
options := new(callSubOrchestratorOptions)
for _, configure := range opts {
if err := configure(options); err != nil {
failedTask := newTask(ctx)
failedTask.fail(helpers.NewTaskFailureDetails(err))
return failedTask
}
}

createSubOrchestrationAction := helpers.NewCreateSubOrchestrationAction(
ctx.getNextSequenceNumber(),
helpers.GetTaskFunctionName(orchestrator),
options.instanceID,
options.rawInput,
)
ctx.pendingActions[createSubOrchestrationAction.Id] = createSubOrchestrationAction

task := newTask(ctx)
ctx.pendingTasks[createSubOrchestrationAction.Id] = task
return task
}

// CreateTimer schedules a durable timer that expires after the specified delay.
func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task {
return ctx.createTimerInternal(delay)
Expand Down
14 changes: 7 additions & 7 deletions task/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func Test_computeNextDelay(t *testing.T) {
time2 := time.Now().Add(1 * time.Minute)
type args struct {
currentTimeUtc time.Time
policy ActivityRetryPolicy
policy RetryPolicy
attempt int
firstAttempt time.Time
err error
Expand All @@ -24,7 +24,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "first attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
Expand All @@ -41,7 +41,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "second attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
Expand All @@ -58,7 +58,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "third attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
Expand All @@ -75,7 +75,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "fourth attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
Expand All @@ -92,7 +92,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "expired",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
Expand All @@ -109,7 +109,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "fourth attempt backoff 1",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 1,
Expand Down
35 changes: 34 additions & 1 deletion tests/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func Test_Grpc_ReuseInstanceIDError(t *testing.T) {
func Test_Grpc_ActivityRetries(t *testing.T) {
r := task.NewTaskRegistry()
r.AddOrchestratorN("ActivityRetries", func(ctx *task.OrchestrationContext) (any, error) {
if err := ctx.CallActivity("FailActivity", task.WithRetryPolicy(&task.ActivityRetryPolicy{
if err := ctx.CallActivity("FailActivity", task.WithActivityRetryPolicy(&task.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 10 * time.Millisecond,
})).Await(nil); err != nil {
Expand All @@ -453,3 +453,36 @@ func Test_Grpc_ActivityRetries(t *testing.T) {
// With 3 max attempts there will be two retries with 10 millis delay before each
require.GreaterOrEqual(t, metadata.LastUpdatedAt, metadata.CreatedAt.Add(2*10*time.Millisecond))
}

func Test_Grpc_SubOrchestratorRetries(t *testing.T) {
r := task.NewTaskRegistry()
r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) {
err := ctx.CallSubOrchestrator(
"Child",
task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_child"),
task.WithSubOrchestrationRetryPolicy(&task.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 10 * time.Millisecond,
BackoffCoefficient: 2,
})).Await(nil)
return nil, err
})
r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) {
return nil, errors.New("Child failed")
})

cancelListener := startGrpcListener(t, r)
defer cancelListener()
instanceID := api.InstanceID("orchestrator_retries")

id, err := grpcClient.ScheduleNewOrchestration(ctx, "Parent", api.WithInstanceID(instanceID))
require.NoError(t, err)
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second)
defer cancelTimeout()
metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true))
require.NoError(t, err)
assert.Equal(t, true, metadata.IsComplete())
assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED, metadata.RuntimeStatus)
// With 3 max attempts there will be two retries with 10 millis delay before each
require.GreaterOrEqual(t, metadata.LastUpdatedAt, metadata.CreatedAt.Add(2*10*time.Millisecond))
}
Loading

0 comments on commit 6ab94e1

Please sign in to comment.