From fc942ded8de68f88dddd3cc07a47f0363fe54b29 Mon Sep 17 00:00:00 2001 From: James Thomas Date: Tue, 17 Jun 2025 11:18:23 -0400 Subject: [PATCH] chore: go-require: require must only be used in the goroutine running the test function (testifylint) - Move require.Error outside of goroutine --- go.mod | 3 +- go.sum | 2 + internal/datastore/common/gc.go | 21 +++-- internal/datastore/common/gc_test.go | 123 +++++++++++++++------------ internal/services/health/health.go | 3 +- internal/telemetry/reporter.go | 3 +- pkg/cmd/datastore.go | 5 +- pkg/datastore/test/revisions.go | 5 +- 8 files changed, 93 insertions(+), 72 deletions(-) diff --git a/go.mod b/go.mod index ab23399f1..481d181bf 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/bits-and-blooms/bloom/v3 v3.7.1 github.com/caio/go-tdigest/v4 v4.1.0 github.com/ccoveille/go-safecast v1.8.1 - github.com/cenkalti/backoff/v4 v4.3.0 + github.com/cenkalti/backoff/v5 v5.0.3 github.com/cespare/xxhash/v2 v2.3.0 github.com/cloudspannerecosystem/spanner-change-streams-tail v0.3.1 github.com/creasty/defaults v1.8.0 @@ -211,6 +211,7 @@ require ( github.com/butuzov/mirror v1.3.0 // indirect github.com/catenacyber/perfsprint v0.9.1 // indirect github.com/ccojocar/zxcvbn-go v1.0.4 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect github.com/charithe/durationcheck v0.0.10 // indirect github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc // indirect diff --git a/go.sum b/go.sum index 1e15aa307..6507c691c 100644 --- a/go.sum +++ b/go.sum @@ -1534,6 +1534,8 @@ github.com/ccoveille/go-safecast v1.8.1 h1:RoucjfYKKcx2lFmIjRjuo8AeX9k/GaZn5SUMH github.com/ccoveille/go-safecast v1.8.1/go.mod h1:QqwNjxQ7DAqY0C721OIO9InMk9zCwcsO7tnRuHytad8= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= diff --git a/internal/datastore/common/gc.go b/internal/datastore/common/gc.go index 7aba0c045..efade531a 100644 --- a/internal/datastore/common/gc.go +++ b/internal/datastore/common/gc.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v5" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" @@ -148,14 +148,20 @@ var MaxGCInterval = 60 * time.Minute // StartGarbageCollector loops forever until the context is canceled and // performs garbage collection on the provided interval. func StartGarbageCollector(ctx context.Context, collectable GarbageCollectableDatastore, interval, window, timeout time.Duration) error { - return startGarbageCollectorWithMaxElapsedTime(ctx, collectable, interval, window, 0, timeout, gcFailureCounter) + return runPeriodicallyWithBackoff(ctx, func() error { + // NOTE: we're okay using the parent context here because the + // callers of this function create a dedicated garbage collection + // context anyway, which is only cancelled when the ds is closed. + gcCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + return RunGarbageCollection(gcCtx, collectable, window) + }, interval, window, timeout, gcFailureCounter) } -func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, collectable GarbageCollectableDatastore, interval, window, maxElapsedTime, timeout time.Duration, failureCounter prometheus.Counter) error { +func runPeriodicallyWithBackoff(ctx context.Context, taskFn func() error, interval, window, timeout time.Duration, failureCounter prometheus.Counter) error { backoffInterval := backoff.NewExponentialBackOff() backoffInterval.InitialInterval = interval backoffInterval.MaxInterval = max(MaxGCInterval, interval) - backoffInterval.MaxElapsedTime = maxElapsedTime backoffInterval.Reset() nextInterval := interval @@ -178,7 +184,7 @@ func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, collectable Ga Dur("timeout", timeout). Msg("running garbage collection worker") - err := RunGarbageCollection(collectable, window, timeout) + err := taskFn() if err != nil { failureCounter.Inc() nextInterval = backoffInterval.NextBackOff() @@ -199,10 +205,7 @@ func startGarbageCollectorWithMaxElapsedTime(ctx context.Context, collectable Ga } // RunGarbageCollection runs garbage collection for the datastore. -func RunGarbageCollection(collectable GarbageCollectableDatastore, window, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - +func RunGarbageCollection(ctx context.Context, collectable GarbageCollectableDatastore, window time.Duration) error { ctx, span := tracer.Start(ctx, "RunGarbageCollection") defer span.End() diff --git a/internal/datastore/common/gc_test.go b/internal/datastore/common/gc_test.go index e52811515..22774c4af 100644 --- a/internal/datastore/common/gc_test.go +++ b/internal/datastore/common/gc_test.go @@ -2,15 +2,18 @@ package common import ( "context" + "errors" "fmt" "slices" "sync" "testing" + "testing/synctest" "time" "github.com/prometheus/client_golang/prometheus" promclient "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + "go.uber.org/goleak" "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" @@ -181,19 +184,32 @@ func (d revisionErrorDeleter) DeleteExpiredRels() (int64, error) { return 0, nil } +func alwaysErr() error { + return errors.New("aaagh") +} + func TestGCFailureBackoff(t *testing.T) { + t.Cleanup(func() { + goleak.VerifyNone(t) + }) localCounter := prometheus.NewCounter(gcFailureCounterConfig) reg := prometheus.NewRegistry() require.NoError(t, reg.Register(localCounter)) - ctx, cancel := context.WithCancel(t.Context()) - defer cancel() - go func() { - gc := newFakeGCStore(alwaysErrorDeleter{}) - require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, gc, 100*time.Millisecond, 1*time.Second, 1*time.Nanosecond, 1*time.Minute, localCounter)) - }() - time.Sleep(200 * time.Millisecond) - cancel() + errCh := make(chan error, 1) + synctest.Test(t, func(t *testing.T) { + duration := 1000 * time.Second + ctx, cancel := context.WithTimeout(t.Context(), duration) + t.Cleanup(func() { + cancel() + }) + go func() { + errCh <- runPeriodicallyWithBackoff(ctx, alwaysErr, 100*time.Second, 1*time.Second, 1*time.Minute, localCounter) + }() + time.Sleep(duration) + synctest.Wait() + }) + require.Error(t, <-errCh) metrics, err := reg.Gather() require.NoError(t, err) @@ -201,30 +217,12 @@ func TestGCFailureBackoff(t *testing.T) { for _, metric := range metrics { if metric.GetName() == "spicedb_datastore_gc_failure_total" { mf = metric + break } } - require.Greater(t, *(mf.GetMetric()[0].Counter.Value), 100.0, "MaxElapsedTime=1ns did not cause backoff to get ignored") - - localCounter = prometheus.NewCounter(gcFailureCounterConfig) - reg = prometheus.NewRegistry() - require.NoError(t, reg.Register(localCounter)) - ctx, cancel = context.WithCancel(t.Context()) - defer cancel() - go func() { - gc := newFakeGCStore(alwaysErrorDeleter{}) - require.Error(t, startGarbageCollectorWithMaxElapsedTime(ctx, gc, 100*time.Millisecond, 0, 1*time.Second, 1*time.Minute, localCounter)) - }() - time.Sleep(200 * time.Millisecond) - cancel() - - metrics, err = reg.Gather() - require.NoError(t, err) - for _, metric := range metrics { - if metric.GetName() == "spicedb_datastore_gc_failure_total" { - mf = metric - } - } - require.Less(t, *(mf.GetMetric()[0].Counter.Value), 3.0, "MaxElapsedTime=0 should have not caused backoff to get ignored") + // We expect about 5 failures; the behavior of the library means that there's some wiggle room here. + // (owing to the jitter in the backoff) + require.Greater(t, *(mf.GetMetric()[0].Counter.Value), 3.0, "did not see expected number of backoffs") } // Ensure the garbage collector interval is reset after recovering from an @@ -238,19 +236,25 @@ func TestGCFailureBackoffReset(t *testing.T) { errorOnRevisions: []uint64{1, 2, 3, 4, 5}, }) - ctx, cancel := context.WithCancel(t.Context()) - defer cancel() - - go func() { - interval := 10 * time.Millisecond - window := 10 * time.Second - timeout := 1 * time.Minute - - require.Error(t, StartGarbageCollector(ctx, gc, interval, window, timeout)) - }() + errCh := make(chan error, 1) + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(func() { + cancel() + }) + go func() { + interval := 10 * time.Millisecond + window := 10 * time.Second + timeout := 1 * time.Minute + + errCh <- StartGarbageCollector(ctx, gc, interval, window, timeout) + }() + time.Sleep(500 * time.Millisecond) + cancel() + synctest.Wait() + }) - time.Sleep(500 * time.Millisecond) - cancel() + require.Error(t, <-errCh) // The next interval should have been reset after recovering from the error. // If it is not reset, the last exponential backoff interval will not give @@ -264,20 +268,29 @@ func TestGCFailureBackoffReset(t *testing.T) { func TestGCUnlockOnTimeout(t *testing.T) { gc := newFakeGCStore(alwaysErrorDeleter{}) - ctx, cancel := context.WithCancel(t.Context()) - defer cancel() - - go func() { - interval := 10 * time.Millisecond - window := 10 * time.Second - timeout := 1 * time.Millisecond - - require.Error(t, StartGarbageCollector(ctx, gc, interval, window, timeout)) - }() - - time.Sleep(30 * time.Millisecond) - require.False(t, gc.HasGCRun(), "GC should not have run") + errCh := make(chan error, 1) + hasRunChan := make(chan bool, 1) + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(func() { + cancel() + }) + go func() { + interval := 10 * time.Millisecond + window := 10 * time.Second + timeout := 1 * time.Minute + + errCh <- StartGarbageCollector(ctx, gc, interval, window, timeout) + }() + time.Sleep(30 * time.Millisecond) + hasRunChan <- gc.HasGCRun() + cancel() + synctest.Wait() + }) + require.Error(t, <-errCh) + require.False(t, <-hasRunChan, "GC should not have run") + // TODO: should this be inside the goroutine as well? gc.fakeGC.lock.Lock() defer gc.fakeGC.lock.Unlock() diff --git a/internal/services/health/health.go b/internal/services/health/health.go index 088de7e3e..75602bf03 100644 --- a/internal/services/health/health.go +++ b/internal/services/health/health.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v5" healthpb "google.golang.org/grpc/health/grpc_health_v1" "github.com/authzed/grpcutil" @@ -64,7 +64,6 @@ func (hm *healthManager) Checker(ctx context.Context) func() error { return func() error { // Run immediately for the initial check backoffInterval := backoff.NewExponentialBackOff() - backoffInterval.MaxElapsedTime = 0 ticker := time.After(0) diff --git a/internal/telemetry/reporter.go b/internal/telemetry/reporter.go index c464020bc..348d14f99 100644 --- a/internal/telemetry/reporter.go +++ b/internal/telemetry/reporter.go @@ -14,7 +14,7 @@ import ( "time" prompb "buf.build/gen/go/prometheus/prometheus/protocolbuffers/go" - "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v5" "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/expfmt" @@ -176,7 +176,6 @@ func RemoteReporter( backoffInterval := backoff.NewExponentialBackOff() backoffInterval.InitialInterval = interval backoffInterval.MaxInterval = MaxElapsedTimeBetweenReports - backoffInterval.MaxElapsedTime = 0 // Must reset the backoff object after changing parameters backoffInterval.Reset() diff --git a/pkg/cmd/datastore.go b/pkg/cmd/datastore.go index 74970add3..ae6919a35 100644 --- a/pkg/cmd/datastore.go +++ b/pkg/cmd/datastore.go @@ -60,7 +60,8 @@ func NewGCDatastoreCommand(programName string, cfg *datastore.Config) *cobra.Com Long: "Executes garbage collection against the datastore. Deletes stale relationships, expired relationships, and stale transactions.", PreRunE: server.DefaultPreRunE(programName), RunE: termination.PublishError(func(cmd *cobra.Command, args []string) error { - ctx := context.Background() + ctx, cancel := context.WithTimeout(cmd.Context(), cfg.GCMaxOperationTime) + defer cancel() // Disable background GC and hedging. cfg.GCInterval = -1 * time.Hour @@ -81,7 +82,7 @@ func NewGCDatastoreCommand(programName string, cfg *datastore.Config) *cobra.Com Float64("gc_max_operation_time_seconds", cfg.GCMaxOperationTime.Seconds()). Msg("Running garbage collection...") - err = common.RunGarbageCollection(gcds, cfg.GCWindow, cfg.GCMaxOperationTime) + err = common.RunGarbageCollection(ctx, gcds, cfg.GCWindow) if err != nil { return err } diff --git a/pkg/datastore/test/revisions.go b/pkg/datastore/test/revisions.go index b1923aa43..8bc999948 100644 --- a/pkg/datastore/test/revisions.go +++ b/pkg/datastore/test/revisions.go @@ -20,6 +20,7 @@ import ( // RevisionQuantizationTest tests whether or not the requirements for revisions hold // for a particular datastore. +// TODO: rewrite using synctest func RevisionQuantizationTest(t *testing.T, tester DatastoreTester) { testCases := []struct { quantizationRange time.Duration @@ -95,6 +96,7 @@ func RevisionSerializationTest(t *testing.T, tester DatastoreTester) { // GCProcessRunTest tests whether the custom GC process runs for the datastore. // For datastores that do not have custom GC processes, will no-op. +// TODO: rewrite using synctest func GCProcessRunTest(t *testing.T, tester DatastoreTester) { require := require.New(t) gcWindow := 300 * time.Millisecond @@ -139,6 +141,7 @@ func GCProcessRunTest(t *testing.T, tester DatastoreTester) { // RevisionGCTest makes sure revision GC takes place, revisions out-side of the GC window // are invalid, and revisions inside the GC window are valid. +// TODO: rewrite using synctest if possible func RevisionGCTest(t *testing.T, tester DatastoreTester) { require := require.New(t) gcWindow := 300 * time.Millisecond @@ -179,7 +182,7 @@ func RevisionGCTest(t *testing.T, tester DatastoreTester) { if ok { // Run garbage collection. gcable.ResetGCCompleted() - err := common.RunGarbageCollection(gcable, gcWindow, 10*time.Second) + err := common.RunGarbageCollection(ctx, gcable, gcWindow) require.NoError(err) require.True(gcable.HasGCRun(), "GC was never run as expected") }