Skip to content

Commit

Permalink
Improve worker shutdown logic (#77)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting authored Nov 13, 2024
1 parent 0c4afbc commit febd2db
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 10 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Make WaitForOrchestrationXXX gRPC APIs resilient ([#80](https://github.com/microsoft/durabletask-go/pull/81)) - by [@famarting](https://github.com/famarting)
- Make WaitForOrchestrationXXX gRPC APIs resilient ([#80](https://github.com/microsoft/durabletask-go/pull/80)) - by [@famarting](https://github.com/famarting)
- Improve worker shutdown logic ([#77](https://github.com/microsoft/durabletask-go/pull/77)) - by [@famarting](https://github.com/famarting)

## [v0.5.0] - 2024-06-28

Expand Down
20 changes: 18 additions & 2 deletions backend/taskhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"context"
"sync"
)

type TaskHubWorker interface {
Expand Down Expand Up @@ -50,7 +51,22 @@ func (w *taskHubWorker) Shutdown(ctx context.Context) error {
}

w.logger.Info("workers stopping and draining...")
w.orchestrationWorker.StopAndDrain()
w.activityWorker.StopAndDrain()
defer w.logger.Info("finished stopping and draining workers!")

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
w.orchestrationWorker.StopAndDrain()
}()

wg.Add(1)
go func() {
defer wg.Done()
w.activityWorker.StopAndDrain()
}()

wg.Wait()

return nil
}
28 changes: 27 additions & 1 deletion backend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -45,6 +46,7 @@ type worker struct {
cancel context.CancelFunc
processor TaskProcessor
waiting bool
stop atomic.Bool
}

type NewTaskWorkerOptions func(*WorkerOptions)
Expand Down Expand Up @@ -89,6 +91,8 @@ func (w *worker) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel

w.stop.Store(false)

go func() {
var b backoff.BackOff = &backoff.ExponentialBackOff{
InitialInterval: 50 * time.Millisecond,
Expand Down Expand Up @@ -190,6 +194,11 @@ func (w *worker) ProcessNext(ctx context.Context) (bool, error) {
}

func (w *worker) StopAndDrain() {
w.logger.Debugf("%v: stop and drain...", w.Name())
defer w.logger.Debugf("%v: finished stop and drain...", w.Name())

w.stop.Store(true)

// Cancel the background poller and dispatcher(s)
if w.cancel != nil {
w.cancel()
Expand All @@ -206,20 +215,37 @@ func (w *worker) processWorkItem(ctx context.Context, wi WorkItem) {

w.logger.Debugf("%v: processing work item: %s", w.Name(), wi)

if w.stop.Load() {
if err := w.processor.AbandonWorkItem(context.Background(), wi); err != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.Name(), err)
}
return
}

if err := w.processor.ProcessWorkItem(ctx, wi); err != nil {
if errors.Is(err, ctx.Err()) {
w.logger.Warnf("%v: abandoning work item due to cancellation", w.Name())
} else {
w.logger.Errorf("%v: failed to process work item: %v", w.Name(), err)
}
if w.stop.Load() {
ctx = context.Background()
}
if err := w.processor.AbandonWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.Name(), err)
}
return
}

if err := w.processor.CompleteWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to complete work item: %v", w.Name(), err)
if errors.Is(err, ctx.Err()) {
w.logger.Warnf("%v: failed to complete work item due to cancellation", w.Name())
} else {
w.logger.Errorf("%v: failed to complete work item: %v", w.Name(), err)
}
if w.stop.Load() {
ctx = context.Background()
}
if err := w.processor.AbandonWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.Name(), err)
}
Expand Down
128 changes: 128 additions & 0 deletions tests/mocks/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package mocks

import (
context "context"
"errors"
"sync"
"sync/atomic"
"time"

backend "github.com/microsoft/durabletask-go/backend"
)

var _ backend.TaskProcessor = &TestTaskProcessor{}

// TestTaskProcessor implements a dummy task processor useful for testing
type TestTaskProcessor struct {
name string

processingBlocked atomic.Bool

workItemMu sync.Mutex
workItems []backend.WorkItem

abandonedWorkItemMu sync.Mutex
abandonedWorkItems []backend.WorkItem

completedWorkItemMu sync.Mutex
completedWorkItems []backend.WorkItem
}

func NewTestTaskPocessor(name string) *TestTaskProcessor {
return &TestTaskProcessor{
name: name,
}
}

func (t *TestTaskProcessor) BlockProcessing() {
t.processingBlocked.Store(true)
}

func (t *TestTaskProcessor) UnblockProcessing() {
t.processingBlocked.Store(false)
}

func (t *TestTaskProcessor) PendingWorkItems() []backend.WorkItem {
t.workItemMu.Lock()
defer t.workItemMu.Unlock()

// copy array
return append([]backend.WorkItem{}, t.workItems...)
}

func (t *TestTaskProcessor) AbandonedWorkItems() []backend.WorkItem {
t.abandonedWorkItemMu.Lock()
defer t.abandonedWorkItemMu.Unlock()

// copy array
return append([]backend.WorkItem{}, t.abandonedWorkItems...)
}

func (t *TestTaskProcessor) CompletedWorkItems() []backend.WorkItem {
t.completedWorkItemMu.Lock()
defer t.completedWorkItemMu.Unlock()

// copy array
return append([]backend.WorkItem{}, t.completedWorkItems...)
}

func (t *TestTaskProcessor) AddWorkItems(wis ...backend.WorkItem) {
t.workItemMu.Lock()
defer t.workItemMu.Unlock()

t.workItems = append(t.workItems, wis...)
}

func (t *TestTaskProcessor) Name() string {
return t.name
}

func (t *TestTaskProcessor) FetchWorkItem(context.Context) (backend.WorkItem, error) {
t.workItemMu.Lock()
defer t.workItemMu.Unlock()

if len(t.workItems) == 0 {
return nil, backend.ErrNoWorkItems
}

// pop first item
i := 0
wi := t.workItems[i]
t.workItems = append(t.workItems[:i], t.workItems[i+1:]...)

return wi, nil
}

func (t *TestTaskProcessor) ProcessWorkItem(ctx context.Context, wi backend.WorkItem) error {
if !t.processingBlocked.Load() {
return nil
}
// wait for context cancellation or until processing is unblocked
for {
select {
case <-ctx.Done():
return errors.New("dummy error processing work item")
default:
if !t.processingBlocked.Load() {
return nil
}
time.Sleep(time.Millisecond)
}
}
}

func (t *TestTaskProcessor) AbandonWorkItem(ctx context.Context, wi backend.WorkItem) error {
t.abandonedWorkItemMu.Lock()
defer t.abandonedWorkItemMu.Unlock()

t.abandonedWorkItems = append(t.abandonedWorkItems, wi)
return nil
}

func (t *TestTaskProcessor) CompleteWorkItem(ctx context.Context, wi backend.WorkItem) error {
t.completedWorkItemMu.Lock()
defer t.completedWorkItemMu.Unlock()

t.completedWorkItems = append(t.completedWorkItems, wi)
return nil
}
Loading

0 comments on commit febd2db

Please sign in to comment.