Skip to content

Commit

Permalink
gracefully shutdown reconcilers and catalogd FBC
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Lanford <[email protected]>
  • Loading branch information
joelanford committed Feb 2, 2025
1 parent 9b08aea commit 0b8e7f5
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 9 deletions.
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ linters-settings:
alias: ctrl
- pkg: github.com/blang/semver/v4
alias: bsemver
- pkg: "^github.com/operator-framework/operator-controller/internal/util/([^/]+)$"
alias: "${1}util"

output:
formats:
Expand Down
2 changes: 1 addition & 1 deletion catalogd/config/base/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ spec:
imagePullPolicy: IfNotPresent
terminationMessagePolicy: FallbackToLogsOnError
serviceAccountName: controller-manager
terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 60
volumes:
- name: cache
emptyDir: {}
10 changes: 7 additions & 3 deletions catalogd/internal/controllers/core/clustercatalog_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
catalogdv1 "github.com/operator-framework/operator-controller/catalogd/api/v1"
"github.com/operator-framework/operator-controller/catalogd/internal/source"
"github.com/operator-framework/operator-controller/catalogd/internal/storage"
contextutil "github.com/operator-framework/operator-controller/internal/util/context"
)

const (
Expand Down Expand Up @@ -91,8 +92,11 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, client.IgnoreNotFound(err)
}

delayedCtx, delayedCancel := contextutil.WithDelay(ctx, time.Minute)
defer delayedCancel()

reconciledCatsrc := existingCatsrc.DeepCopy()
res, reconcileErr := r.reconcile(ctx, reconciledCatsrc)
res, reconcileErr := r.reconcile(delayedCtx, reconciledCatsrc)

// If we encounter an error, we should delete the stored catalog metadata
// which represents the state of a successfully unpacked catalog. Deleting
Expand All @@ -118,15 +122,15 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque
finalizers := reconciledCatsrc.Finalizers

if updateStatus {
if err := r.Client.Status().Update(ctx, reconciledCatsrc); err != nil {
if err := r.Client.Status().Update(delayedCtx, reconciledCatsrc); err != nil {
reconcileErr = errors.Join(reconcileErr, fmt.Errorf("error updating status: %v", err))
}
}

reconciledCatsrc.Finalizers = finalizers

if updateFinalizers {
if err := r.Client.Update(ctx, reconciledCatsrc); err != nil {
if err := r.Client.Update(delayedCtx, reconciledCatsrc); err != nil {
reconcileErr = errors.Join(reconcileErr, fmt.Errorf("error updating finalizers: %v", err))
}
}
Expand Down
2 changes: 1 addition & 1 deletion catalogd/internal/serverutil/serverutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil
listener = tls.NewListener(listener, config)
}

shutdownTimeout := 30 * time.Second
shutdownTimeout := 60 * time.Second

l := mgr.GetLogger().WithName("catalogd-http-server")
handler := catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler())
Expand Down
2 changes: 1 addition & 1 deletion config/base/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ spec:
memory: 64Mi
terminationMessagePolicy: FallbackToLogsOnError
serviceAccountName: operator-controller-controller-manager
terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 60
volumes:
- name: cache
emptyDir: {}
10 changes: 7 additions & 3 deletions internal/controllers/clusterextension_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/operator-framework/operator-controller/internal/labels"
"github.com/operator-framework/operator-controller/internal/resolve"
rukpaksource "github.com/operator-framework/operator-controller/internal/rukpak/source"
contextutil "github.com/operator-framework/operator-controller/internal/util/context"
)

const (
Expand Down Expand Up @@ -110,8 +111,11 @@ func (r *ClusterExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, client.IgnoreNotFound(err)
}

delayedCtx, delayedCancel := contextutil.WithDelay(ctx, time.Minute)
defer delayedCancel()

reconciledExt := existingExt.DeepCopy()
res, reconcileErr := r.reconcile(ctx, reconciledExt)
res, reconcileErr := r.reconcile(delayedCtx, reconciledExt)

// Do checks before any Update()s, as Update() may modify the resource structure!
updateStatus := !equality.Semantic.DeepEqual(existingExt.Status, reconciledExt.Status)
Expand All @@ -129,14 +133,14 @@ func (r *ClusterExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Req
// reconciledExt before updating the object.
finalizers := reconciledExt.Finalizers
if updateStatus {
if err := r.Client.Status().Update(ctx, reconciledExt); err != nil {
if err := r.Client.Status().Update(delayedCtx, reconciledExt); err != nil {
reconcileErr = errors.Join(reconcileErr, fmt.Errorf("error updating status: %v", err))
}
}
reconciledExt.Finalizers = finalizers

if updateFinalizers {
if err := r.Client.Update(ctx, reconciledExt); err != nil {
if err := r.Client.Update(delayedCtx, reconciledExt); err != nil {
reconcileErr = errors.Join(reconcileErr, fmt.Errorf("error updating finalizers: %v", err))
}
}
Expand Down
113 changes: 113 additions & 0 deletions internal/util/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package context

import (
"context"
"time"
)

func (d *delayContext) Deadline() (time.Time, bool) {
select {
case <-d.parentCtx.Done():
// if the parent context is done, wait
// for our timeout setup to complete, then
// return the timeout context's deadline.
<-d.setupDone
return d.timeoutCtx.Deadline()
default:
// if the parent context has a deadline, simply add
// our delay.
if parentDeadline, ok := d.parentCtx.Deadline(); ok {
return parentDeadline.Add(d.delay), true
}
// if the parent context does not have a deadline
// then we don't know ours either because it depends
// on when the parent is done.
return time.Time{}, false
}
}

func (d *delayContext) Done() <-chan struct{} {
return d.done
}

func (d *delayContext) Err() error {
// If the parent context is done, wait until setup
// is done, then return the timeout context's error.
select {
case <-d.parentCtx.Done():
<-d.setupDone
return d.timeoutCtx.Err()
default:
}

// If done is closed, that means we were
// directly cancelled. Otherwise (if neither
// parent context is done or done is closed)
// the context is still active, hence no error
select {
case <-d.done:
return context.Canceled
default:
return nil
}
}

func (d *delayContext) Value(key interface{}) interface{} {
return d.parentCtx.Value(key)
}

type delayContext struct {
parentCtx context.Context
delay time.Duration

done chan struct{}
setupDone chan struct{}

timeoutCtx context.Context
timeoutCancel context.CancelFunc
}

func WithDelay(parentCtx context.Context, delay time.Duration) (context.Context, context.CancelFunc) {
delayedCtx := &delayContext{
parentCtx: parentCtx,
delay: delay,
done: make(chan struct{}),
setupDone: make(chan struct{}),
}

setupDelay := func() {
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), delay)
context.AfterFunc(timeoutCtx, func() { close(delayedCtx.done) })
delayedCtx.timeoutCtx = timeoutCtx
delayedCtx.timeoutCancel = timeoutCancel
close(delayedCtx.setupDone)
}

unregisterDelay := context.AfterFunc(parentCtx, setupDelay)

cancelFunc := func() {
setupNeverHappened := unregisterDelay()
if setupNeverHappened {
// if setup never happened, then the delay context was
// cancelled prior to the parent context being done.
//
// all we need to do here is close the done chan.
close(delayedCtx.done)
} else {
// if we're here, the setup function was called

// wait until setup is done to ensure there is a
// timeoutContext/timeoutCancel
<-delayedCtx.setupDone

// cancel the timeout context (which includes
// an AfterFunc to also close our doneChan, so
// we'll wait for that to be closed before
// returning)
delayedCtx.timeoutCancel()
<-delayedCtx.done
}
}

return delayedCtx, cancelFunc
}
99 changes: 99 additions & 0 deletions internal/util/context/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package context_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

contextutil "github.com/operator-framework/operator-controller/internal/util/context"
)

func TestWithDelay_Delays(t *testing.T) {
for _, delay := range []time.Duration{
0,
time.Millisecond * 10,
time.Millisecond * 100,
time.Millisecond * 200,
} {
t.Run(delay.String(), func(t *testing.T) {
parentCtx, parentCancel := context.WithCancel(context.Background())
delayCtx, _ := contextutil.WithDelay(parentCtx, delay)

parentCancel()

// verify deadline is within 1m ms of what we expect
expectDeadline := time.Now().Add(delay)
actualDeadline, ok := delayCtx.Deadline()
assert.True(t, ok, "expected delay context to have a deadline after parent was cancelled")
assert.WithinDurationf(t, expectDeadline, actualDeadline, time.Millisecond, "expected the context's deadline (%v) to be within 1 ms of %v; diff was %v", expectDeadline, actualDeadline, expectDeadline.Sub(actualDeadline))

// verify context is done due to deadline exceeded and that it happens
// within 3ms of our expectation
select {
case <-delayCtx.Done():
assert.ErrorIs(t, delayCtx.Err(), context.DeadlineExceeded)

Check failure on line 36 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / lint

require-error: for error assertions use require (testifylint)
case <-time.After(expectDeadline.Sub(time.Now()) + 3*time.Millisecond):

Check failure on line 37 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / lint

S1024: should use time.Until instead of t.Sub(time.Now()) (gosimple)
diff := time.Now().Sub(expectDeadline)

Check failure on line 38 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)
t.Fatalf("delay context should have been canceled quickly after %s, but it took %s", delay, diff)
}
})
}
}

func TestWithDelay_Deadline(t *testing.T) {
t.Run("parent has deadline", func(t *testing.T) {
parentDeadline := time.Now().Add(200 * time.Millisecond)
parentCtx, _ := context.WithDeadline(context.Background(), parentDeadline)

Check failure on line 48 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / e2e-kind

the cancel function returned by context.WithDeadline should be called, not discarded, to avoid a context leak

Check failure on line 48 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / lint

lostcancel: the cancel function returned by context.WithDeadline should be called, not discarded, to avoid a context leak (govet)

Check failure on line 48 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / verify

the cancel function returned by context.WithDeadline should be called, not discarded, to avoid a context leak

Check failure on line 48 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / extension-developer-e2e

the cancel function returned by context.WithDeadline should be called, not discarded, to avoid a context leak

Check failure on line 48 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / upgrade-e2e

the cancel function returned by context.WithDeadline should be called, not discarded, to avoid a context leak

delay := 250 * time.Millisecond
delayCtx, _ := contextutil.WithDelay(parentCtx, delay)

expectDeadline := parentDeadline.Add(delay)
actualDeadline, ok := delayCtx.Deadline()

assert.True(t, ok, "expected delay context to have a deadline before parent was cancelled")
assert.Equal(t, expectDeadline, actualDeadline)
})
t.Run("parent has no deadline", func(t *testing.T) {
parentCtx, _ := context.WithCancel(context.Background())

Check failure on line 60 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / e2e-kind

the cancel function returned by context.WithCancel should be called, not discarded, to avoid a context leak

Check failure on line 60 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / lint

lostcancel: the cancel function returned by context.WithCancel should be called, not discarded, to avoid a context leak (govet)

Check failure on line 60 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / verify

the cancel function returned by context.WithCancel should be called, not discarded, to avoid a context leak

Check failure on line 60 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / extension-developer-e2e

the cancel function returned by context.WithCancel should be called, not discarded, to avoid a context leak

Check failure on line 60 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / upgrade-e2e

the cancel function returned by context.WithCancel should be called, not discarded, to avoid a context leak

delayCtx, _ := contextutil.WithDelay(parentCtx, 200*time.Millisecond)
actualDeadline, ok := delayCtx.Deadline()
assert.False(t, ok, "expected delay context to have an unknown deadline before parent was cancelled")
assert.Equal(t, time.Time{}, actualDeadline, "expected delay context deadline to be unset")

})

Check failure on line 67 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
}

func TestWithDelay_Err(t *testing.T) {
t.Run("nil", func(t *testing.T) {
delayCtx, _ := contextutil.WithDelay(context.Background(), 0)
assert.NoError(t, delayCtx.Err())
})
t.Run("canceled before parent done", func(t *testing.T) {
delayCtx, delayCancel := contextutil.WithDelay(context.Background(), 0)
delayCancel()
assert.ErrorIs(t, delayCtx.Err(), context.Canceled)
})
t.Run("canceled after parent done", func(t *testing.T) {
parentCtx, parentCancel := context.WithCancel(context.Background())
delayCtx, delayCancel := contextutil.WithDelay(parentCtx, 200*time.Millisecond)
parentCancel()
delayCancel()
assert.ErrorIs(t, delayCtx.Err(), context.Canceled)
})
t.Run("deadline exceeded", func(t *testing.T) {
parentCtx, parentCancel := context.WithCancel(context.Background())
delayCtx, _ := contextutil.WithDelay(parentCtx, 0)
parentCancel()
assert.ErrorIs(t, delayCtx.Err(), context.DeadlineExceeded)
})
}

func TestWithDelay_Value(t *testing.T) {
parentCtx := context.WithValue(context.Background(), "foo", "bar")

Check failure on line 96 in internal/util/context/context_test.go

View workflow job for this annotation

GitHub Actions / lint

SA1029: should not use built-in type string as key for value; define your own type to avoid collisions (staticcheck)
delayCtx, _ := contextutil.WithDelay(parentCtx, 0)
assert.Equal(t, "bar", delayCtx.Value("foo"))
}

0 comments on commit 0b8e7f5

Please sign in to comment.