Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 gracefully shutdown reconcilers and catalogd FBC server #1688

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ linters-settings:
alias: ctrl
- pkg: github.com/blang/semver/v4
alias: bsemver
- pkg: "^github.com/operator-framework/operator-controller/internal/util/([^/]+)$"
alias: "${1}util"

output:
formats:
Expand Down
2 changes: 1 addition & 1 deletion catalogd/config/base/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ spec:
imagePullPolicy: IfNotPresent
terminationMessagePolicy: FallbackToLogsOnError
serviceAccountName: controller-manager
terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 60
volumes:
- name: cache
emptyDir: {}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
catalogdv1 "github.com/operator-framework/operator-controller/catalogd/api/v1"
"github.com/operator-framework/operator-controller/catalogd/internal/source"
"github.com/operator-framework/operator-controller/catalogd/internal/storage"
contextutil "github.com/operator-framework/operator-controller/internal/util/context"
)

const (
Expand Down Expand Up @@ -91,8 +92,11 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, client.IgnoreNotFound(err)
}

delayedCtx, delayedCancel := contextutil.WithDelay(ctx, time.Minute)
defer delayedCancel()

reconciledCatsrc := existingCatsrc.DeepCopy()
res, reconcileErr := r.reconcile(ctx, reconciledCatsrc)
res, reconcileErr := r.reconcile(delayedCtx, reconciledCatsrc)

// If we encounter an error, we should delete the stored catalog metadata
// which represents the state of a successfully unpacked catalog. Deleting
Expand All @@ -118,15 +122,15 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque
finalizers := reconciledCatsrc.Finalizers

if updateStatus {
if err := r.Client.Status().Update(ctx, reconciledCatsrc); err != nil {
if err := r.Client.Status().Update(delayedCtx, reconciledCatsrc); err != nil {
reconcileErr = errors.Join(reconcileErr, fmt.Errorf("error updating status: %v", err))
}
}

reconciledCatsrc.Finalizers = finalizers

if updateFinalizers {
if err := r.Client.Update(ctx, reconciledCatsrc); err != nil {
if err := r.Client.Update(delayedCtx, reconciledCatsrc); err != nil {
reconcileErr = errors.Join(reconcileErr, fmt.Errorf("error updating finalizers: %v", err))
}
}
Expand Down
2 changes: 1 addition & 1 deletion catalogd/internal/serverutil/serverutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil
listener = tls.NewListener(listener, config)
}

shutdownTimeout := 30 * time.Second
shutdownTimeout := 60 * time.Second

l := mgr.GetLogger().WithName("catalogd-http-server")
handler := catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler())
Expand Down
2 changes: 1 addition & 1 deletion config/base/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ spec:
memory: 64Mi
terminationMessagePolicy: FallbackToLogsOnError
serviceAccountName: operator-controller-controller-manager
terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 60
volumes:
- name: cache
emptyDir: {}
10 changes: 7 additions & 3 deletions internal/controllers/clusterextension_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/operator-framework/operator-controller/internal/labels"
"github.com/operator-framework/operator-controller/internal/resolve"
rukpaksource "github.com/operator-framework/operator-controller/internal/rukpak/source"
contextutil "github.com/operator-framework/operator-controller/internal/util/context"
)

const (
Expand Down Expand Up @@ -110,8 +111,11 @@ func (r *ClusterExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, client.IgnoreNotFound(err)
}

delayedCtx, delayedCancel := contextutil.WithDelay(ctx, time.Minute)
defer delayedCancel()

reconciledExt := existingExt.DeepCopy()
res, reconcileErr := r.reconcile(ctx, reconciledExt)
res, reconcileErr := r.reconcile(delayedCtx, reconciledExt)

// Do checks before any Update()s, as Update() may modify the resource structure!
updateStatus := !equality.Semantic.DeepEqual(existingExt.Status, reconciledExt.Status)
Expand All @@ -129,14 +133,14 @@ func (r *ClusterExtensionReconciler) Reconcile(ctx context.Context, req ctrl.Req
// reconciledExt before updating the object.
finalizers := reconciledExt.Finalizers
if updateStatus {
if err := r.Client.Status().Update(ctx, reconciledExt); err != nil {
if err := r.Client.Status().Update(delayedCtx, reconciledExt); err != nil {
reconcileErr = errors.Join(reconcileErr, fmt.Errorf("error updating status: %v", err))
}
}
reconciledExt.Finalizers = finalizers

if updateFinalizers {
if err := r.Client.Update(ctx, reconciledExt); err != nil {
if err := r.Client.Update(delayedCtx, reconciledExt); err != nil {
reconcileErr = errors.Join(reconcileErr, fmt.Errorf("error updating finalizers: %v", err))
}
}
Expand Down
113 changes: 113 additions & 0 deletions internal/util/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package context

import (
"context"
"time"
)

func (d *delayContext) Deadline() (time.Time, bool) {
select {
case <-d.parentCtx.Done():
// if the parent context is done, wait
// for our timeout setup to complete, then
// return the timeout context's deadline.
<-d.setupDone
return d.timeoutCtx.Deadline()
default:
// if the parent context has a deadline, simply add
// our delay.
if parentDeadline, ok := d.parentCtx.Deadline(); ok {
return parentDeadline.Add(d.delay), true
}
// if the parent context does not have a deadline
// then we don't know ours either because it depends
// on when the parent is done.
return time.Time{}, false
}
}

func (d *delayContext) Done() <-chan struct{} {
return d.done
}

func (d *delayContext) Err() error {
// If the parent context is done, wait until setup
// is done, then return the timeout context's error.
select {
case <-d.parentCtx.Done():
<-d.setupDone
return d.timeoutCtx.Err()
default:
}

// If done is closed, that means we were
// directly cancelled. Otherwise (if neither
// parent context is done or done is closed)
// the context is still active, hence no error
select {
case <-d.done:
return context.Canceled
default:
return nil
}
}

func (d *delayContext) Value(key interface{}) interface{} {
return d.parentCtx.Value(key)
}

type delayContext struct {
parentCtx context.Context
delay time.Duration

done chan struct{}
setupDone chan struct{}

timeoutCtx context.Context
timeoutCancel context.CancelFunc
}

func WithDelay(parentCtx context.Context, delay time.Duration) (context.Context, context.CancelFunc) {
delayedCtx := &delayContext{
parentCtx: parentCtx,
delay: delay,
done: make(chan struct{}),
setupDone: make(chan struct{}),
}

setupDelay := func() {
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), delay)
context.AfterFunc(timeoutCtx, func() { close(delayedCtx.done) })
delayedCtx.timeoutCtx = timeoutCtx
delayedCtx.timeoutCancel = timeoutCancel
close(delayedCtx.setupDone)
}

unregisterDelay := context.AfterFunc(parentCtx, setupDelay)

cancelFunc := func() {
setupNeverHappened := unregisterDelay()
if setupNeverHappened {
// if setup never happened, then the delay context was
// cancelled prior to the parent context being done.
//
// all we need to do here is close the done chan.
close(delayedCtx.done)
} else {
// if we're here, the setup function was called

// wait until setup is done to ensure there is a
// timeoutContext/timeoutCancel
<-delayedCtx.setupDone

// cancel the timeout context (which includes
// an AfterFunc to also close our doneChan, so
// we'll wait for that to be closed before
// returning)
delayedCtx.timeoutCancel()
<-delayedCtx.done
}
}

return delayedCtx, cancelFunc
}
101 changes: 101 additions & 0 deletions internal/util/context/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package context_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

contextutil "github.com/operator-framework/operator-controller/internal/util/context"
)

func TestWithDelay_Delays(t *testing.T) {
for _, delay := range []time.Duration{
0,
time.Millisecond * 10,
time.Millisecond * 100,
time.Millisecond * 200,
} {
t.Run(delay.String(), func(t *testing.T) {
parentCtx, parentCancel := context.WithCancel(context.Background())
delayCtx, _ := contextutil.WithDelay(parentCtx, delay)

parentCancel()

// verify deadline is within 1m ms of what we expect
expectDeadline := time.Now().Add(delay)
actualDeadline, ok := delayCtx.Deadline()
assert.True(t, ok, "expected delay context to have a deadline after parent was cancelled")
assert.WithinDurationf(t, expectDeadline, actualDeadline, time.Millisecond, "expected the context's deadline (%v) to be within 1 ms of %v; diff was %v", expectDeadline, actualDeadline, expectDeadline.Sub(actualDeadline))

// verify context is done due to deadline exceeded and that it happens
// within 3ms of our expectation
select {
case <-delayCtx.Done():
case <-time.After(time.Until(expectDeadline.Add(3 * time.Millisecond))):
diff := time.Since(expectDeadline)
t.Fatalf("delay context should have been canceled quickly after %s, but it took %s", delay, diff)
}
assert.ErrorIs(t, delayCtx.Err(), context.DeadlineExceeded)
})
}
}

func TestWithDelay_Deadline(t *testing.T) {
t.Run("parent has deadline", func(t *testing.T) {
parentDeadline := time.Now().Add(200 * time.Millisecond)
parentCtx, parentCancel := context.WithDeadline(context.Background(), parentDeadline)
defer parentCancel()

delay := 250 * time.Millisecond
delayCtx, _ := contextutil.WithDelay(parentCtx, delay)

expectDeadline := parentDeadline.Add(delay)
actualDeadline, ok := delayCtx.Deadline()

assert.True(t, ok, "expected delay context to have a deadline before parent was cancelled")
assert.Equal(t, expectDeadline, actualDeadline)
})
t.Run("parent has no deadline", func(t *testing.T) {
parentCtx, parentCancel := context.WithCancel(context.Background())
defer parentCancel()

delayCtx, _ := contextutil.WithDelay(parentCtx, 200*time.Millisecond)
actualDeadline, ok := delayCtx.Deadline()
assert.False(t, ok, "expected delay context to have an unknown deadline before parent was cancelled")
assert.Equal(t, time.Time{}, actualDeadline, "expected delay context deadline to be unset")
})
}

func TestWithDelay_Err(t *testing.T) {
t.Run("nil", func(t *testing.T) {
delayCtx, _ := contextutil.WithDelay(context.Background(), 0)
assert.NoError(t, delayCtx.Err())
})
t.Run("canceled before parent done", func(t *testing.T) {
delayCtx, delayCancel := contextutil.WithDelay(context.Background(), 0)
delayCancel()
assert.ErrorIs(t, delayCtx.Err(), context.Canceled)
})
t.Run("canceled after parent done", func(t *testing.T) {
parentCtx, parentCancel := context.WithCancel(context.Background())
delayCtx, delayCancel := contextutil.WithDelay(parentCtx, 200*time.Millisecond)
parentCancel()
delayCancel()
assert.ErrorIs(t, delayCtx.Err(), context.Canceled)
})
t.Run("deadline exceeded", func(t *testing.T) {
parentCtx, parentCancel := context.WithCancel(context.Background())
delayCtx, _ := contextutil.WithDelay(parentCtx, 0)
parentCancel()
assert.ErrorIs(t, delayCtx.Err(), context.DeadlineExceeded)
})
}

func TestWithDelay_Value(t *testing.T) {
type valueKey string
parentCtx := context.WithValue(context.Background(), valueKey("foo"), "bar")
delayCtx, _ := contextutil.WithDelay(parentCtx, 0)
assert.Equal(t, "bar", delayCtx.Value(valueKey("foo")))
}
Loading