diff --git a/internal/datastore/common/changes.go b/internal/datastore/common/changes.go index 3077f2ecc..aad5cd4e5 100644 --- a/internal/datastore/common/changes.go +++ b/internal/datastore/common/changes.go @@ -2,6 +2,7 @@ package common import ( "context" + "iter" "maps" "reflect" "slices" @@ -290,71 +291,80 @@ func (ch *Changes[R, K]) AddChangedDefinition( // AsRevisionChanges returns the list of changes processed so far as a datastore watch // compatible, ordered, changelist. -func (ch *Changes[R, K]) AsRevisionChanges(lessThanFunc func(lhs, rhs K) bool) ([]datastore.RevisionChanges, error) { +func (ch *Changes[R, K]) AsRevisionChanges(lessThanFunc func(lhs, rhs K) bool) iter.Seq2[datastore.RevisionChanges, error] { return ch.revisionChanges(lessThanFunc, *new(R), false) } // FilterAndRemoveRevisionChanges filters a list of changes processed up to the bound revision from the changes list, removing them // and returning the filtered changes. -func (ch *Changes[R, K]) FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error) { - changes, err := ch.revisionChanges(lessThanFunc, boundRev, true) - if err != nil { - return nil, err - } +func (ch *Changes[R, K]) FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) iter.Seq2[datastore.RevisionChanges, error] { + return func(yield func(datastore.RevisionChanges, error) bool) { + for change, err := range ch.revisionChanges(lessThanFunc, boundRev, true) { + if !yield(change, err) { + break + } + } - ch.removeAllChangesBefore(boundRev) - return changes, nil + ch.removeAllChangesBefore(boundRev) + } } -func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R, withBound bool) ([]datastore.RevisionChanges, error) { - if ch.IsEmpty() { - return nil, nil - } +func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R, withBound bool) iter.Seq2[datastore.RevisionChanges, error] { + return func(yield func(datastore.RevisionChanges, error) bool) { + if ch.IsEmpty() { + return + } - revisionsWithChanges := make([]K, 0, len(ch.records)) - for rk, cr := range ch.records { - if !withBound || boundRev.GreaterThan(cr.rev) { - revisionsWithChanges = append(revisionsWithChanges, rk) + revisionsWithChanges := make([]K, 0, len(ch.records)) + for rk, cr := range ch.records { + if !withBound || boundRev.GreaterThan(cr.rev) { + revisionsWithChanges = append(revisionsWithChanges, rk) + } } - } - if len(revisionsWithChanges) == 0 { - return nil, nil - } + if len(revisionsWithChanges) == 0 { + return + } - sort.Slice(revisionsWithChanges, func(i int, j int) bool { - return lessThanFunc(revisionsWithChanges[i], revisionsWithChanges[j]) - }) + sort.Slice(revisionsWithChanges, func(i int, j int) bool { + return lessThanFunc(revisionsWithChanges[i], revisionsWithChanges[j]) + }) - changes := make([]datastore.RevisionChanges, len(revisionsWithChanges)) - for i, k := range revisionsWithChanges { - revisionChangeRecord := ch.records[k] - changes[i].Revision = revisionChangeRecord.rev - for _, rel := range revisionChangeRecord.relTouches { - changes[i].RelationshipChanges = append(changes[i].RelationshipChanges, tuple.Touch(rel)) - } - for _, rel := range revisionChangeRecord.relDeletes { - changes[i].RelationshipChanges = append(changes[i].RelationshipChanges, tuple.Delete(rel)) - } - changes[i].ChangedDefinitions = slices.Collect(maps.Values(revisionChangeRecord.definitionsChanged)) - changes[i].DeletedNamespaces = slices.Collect(maps.Keys(revisionChangeRecord.namespacesDeleted)) - changes[i].DeletedCaveats = slices.Collect(maps.Keys(revisionChangeRecord.caveatsDeleted)) - - if len(revisionChangeRecord.metadatas) > 0 { - metadatas := make([]*structpb.Struct, 0, len(revisionChangeRecord.metadatas)) - for _, metadata := range revisionChangeRecord.metadatas { - structpbMetadata, err := structpb.NewStruct(metadata) - if err != nil { - return nil, spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err) + for _, k := range revisionsWithChanges { + revisionChangeRecord := ch.records[k] + change := datastore.RevisionChanges{ + Revision: revisionChangeRecord.rev, + } + + for _, rel := range revisionChangeRecord.relTouches { + change.RelationshipChanges = append(change.RelationshipChanges, tuple.Touch(rel)) + } + for _, rel := range revisionChangeRecord.relDeletes { + change.RelationshipChanges = append(change.RelationshipChanges, tuple.Delete(rel)) + } + change.ChangedDefinitions = slices.Collect(maps.Values(revisionChangeRecord.definitionsChanged)) + change.DeletedNamespaces = slices.Collect(maps.Keys(revisionChangeRecord.namespacesDeleted)) + change.DeletedCaveats = slices.Collect(maps.Keys(revisionChangeRecord.caveatsDeleted)) + + if len(revisionChangeRecord.metadatas) > 0 { + metadatas := make([]*structpb.Struct, 0, len(revisionChangeRecord.metadatas)) + for _, metadata := range revisionChangeRecord.metadatas { + structpbMetadata, err := structpb.NewStruct(metadata) + if err != nil { + _ = yield(datastore.RevisionChanges{}, spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err)) + return + } + metadatas = append(metadatas, structpbMetadata) } - metadatas = append(metadatas, structpbMetadata) + + change.Metadatas = metadatas } - changes[i].Metadatas = metadatas + if !yield(change, nil) { + break + } } } - - return changes, nil } func (ch *Changes[R, K]) removeAllChangesBefore(boundRev R) { diff --git a/internal/datastore/common/changes_test.go b/internal/datastore/common/changes_test.go index a4e5d7935..6d3c03bf2 100644 --- a/internal/datastore/common/changes_test.go +++ b/internal/datastore/common/changes_test.go @@ -2,6 +2,7 @@ package common import ( "fmt" + "iter" "slices" "sort" "strings" @@ -334,7 +335,7 @@ func TestChanges(t *testing.T) { } } - actual, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc) + actual, err := collectChanges(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)) require.NoError(err) require.Equal( @@ -370,7 +371,7 @@ func TestChanges(t *testing.T) { } } - actual, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc) + actual, err := collectChanges(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)) require.NoError(err) require.Equal( @@ -399,7 +400,7 @@ func TestAddMetadata(t *testing.T) { require.NoError(t, err) require.False(t, ch.IsEmpty()) - results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2) + results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)) require.NoError(t, err) require.Equal(t, 1, len(results)) require.True(t, ch.IsEmpty()) @@ -439,7 +440,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) { err = ch.AddRevisionMetadata(ctx, rev1, map[string]any{"operation": "create", "user_id": "123"}) require.NoError(t, err) - results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2) + results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)) require.NoError(t, err) require.Equal(t, 1, len(results)) require.Equal(t, 1, len(results[0].Metadatas)) @@ -462,7 +463,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) { err = ch.AddRevisionMetadata(ctx, rev1, metadata3) require.NoError(t, err) - results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2) + results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)) require.NoError(t, err) require.Equal(t, 1, len(results)) require.Equal(t, 3, len(results[0].Metadatas)) @@ -502,7 +503,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) { }) require.NoError(t, err) - results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2) + results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)) require.NoError(t, err) require.Equal(t, 1, len(results)) require.Equal(t, 1, len(results[0].Metadatas)) @@ -523,7 +524,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) { err = ch.AddRevisionMetadata(ctx, rev2, metadata2) require.NoError(t, err) - results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3) + results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3)) require.NoError(t, err) require.Equal(t, 2, len(results)) @@ -546,7 +547,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) { err = ch.AddRevisionMetadata(ctx, rev1, metadata2) require.NoError(t, err) - results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2) + results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)) require.NoError(t, err) require.Equal(t, 1, len(results)) require.Equal(t, 2, len(results[0].Metadatas)) @@ -575,7 +576,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) { err = ch.AddRevisionMetadata(ctx, rev1, metadata5) require.NoError(t, err) - results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2) + results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)) require.NoError(t, err) require.Equal(t, 1, len(results)) require.Equal(t, 4, len(results[0].Metadatas)) @@ -600,7 +601,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) { err = ch.AddRevisionMetadata(ctx, rev1, metadata1) require.NoError(t, err) - results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2) + results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)) require.NoError(t, err) require.Equal(t, 1, len(results)) require.Equal(t, 3, len(results[0].Metadatas)) @@ -638,7 +639,7 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) { require.False(t, ch.IsEmpty()) - results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3) + results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3)) require.NoError(t, err) require.Equal(t, 2, len(results)) require.False(t, ch.IsEmpty()) @@ -658,7 +659,7 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) { }, }, results) - remaining, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc) + remaining, err := collectChanges(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)) require.Equal(t, 1, len(remaining)) require.NoError(t, err) @@ -671,12 +672,12 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) { }, }, remaining) - results, err = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillion) + results, err = collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillion)) require.NoError(t, err) require.Equal(t, 1, len(results)) require.True(t, ch.IsEmpty()) - results, err = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillionOne) + results, err = collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillionOne)) require.NoError(t, err) require.Equal(t, 0, len(results)) require.True(t, ch.IsEmpty()) @@ -700,7 +701,7 @@ func TestHLCOrdering(t *testing.T) { err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), tuple.UpdateOperationTouch) require.NoError(t, err) - remaining, err := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc) + remaining, err := collectChanges(ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc)) require.NoError(t, err) require.Equal(t, 2, len(remaining)) @@ -744,7 +745,7 @@ func TestHLCSameRevision(t *testing.T) { err = ch.AddRelationshipChange(ctx, rev0again, tuple.MustParse("document:foo#viewer@user:sarah"), tuple.UpdateOperationTouch) require.NoError(t, err) - remaining, err := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc) + remaining, err := collectChanges(ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc)) require.NoError(t, err) require.Equal(t, 1, len(remaining)) @@ -968,3 +969,14 @@ func canonicalize(in []datastore.RevisionChanges) []datastore.RevisionChanges { return out } + +func collectChanges(changes iter.Seq2[datastore.RevisionChanges, error]) ([]datastore.RevisionChanges, error) { + out := make([]datastore.RevisionChanges, 0, 10) + for change, err := range changes { + if err != nil { + return nil, err + } + out = append(out, change) + } + return out, nil +} diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index e353522a6..bf0458a03 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "iter" "strconv" "strings" "time" @@ -225,7 +226,7 @@ func (cds *crdbDatastore) watch( // changeTracker takes care of accumulating received from CockroachDB until a checkpoint is emitted type changeTracker[R datastore.Revision, K comparable] interface { - FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error) + FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) iter.Seq2[datastore.RevisionChanges, error] AddRelationshipChange(ctx context.Context, rev R, rel tuple.Relationship, op tuple.UpdateOperation) error AddChangedDefinition(ctx context.Context, rev R, def datastore.SchemaDefinition) error AddDeletedNamespace(ctx context.Context, rev R, namespaceName string) error @@ -244,9 +245,10 @@ type streamingChangeProvider struct { sendError sendErrorFunc } -func (s streamingChangeProvider) FilterAndRemoveRevisionChanges(_ func(lhs revisions.HLCRevision, rhs revisions.HLCRevision) bool, _ revisions.HLCRevision) ([]datastore.RevisionChanges, error) { - // we do not accumulate in this implementation, but stream right away - return nil, nil +func (s streamingChangeProvider) FilterAndRemoveRevisionChanges(_ func(lhs revisions.HLCRevision, rhs revisions.HLCRevision) bool, _ revisions.HLCRevision) iter.Seq2[datastore.RevisionChanges, error] { + return func(yield func(datastore.RevisionChanges, error) bool) { + // Nothing to do here, as changes are sent immediately. + } } func (s streamingChangeProvider) AddRelationshipChange(ctx context.Context, rev revisions.HLCRevision, rel tuple.Relationship, op tuple.UpdateOperation) error { @@ -371,13 +373,13 @@ func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows, return } - filtered, err := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev) - if err != nil { - sendError(err) - return - } + filtered := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev) + for revChange, err := range filtered { + if err != nil { + sendError(err) + return + } - for _, revChange := range filtered { revChange := revChange // TODO(jschorr): Change this to a new event type if/when we decide to report these diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 30d3d9710..f493674e9 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -286,24 +286,26 @@ func (mdb *memdbDatastore) ReadWriteTx( } } - var rc datastore.RevisionChanges - changes, err := tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc) - if err != nil { - return datastore.NoRevision, err - } + changes := tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc) + isFirstChange := true + for rc, err := range changes { + if err != nil { + return datastore.NoRevision, err + } - if len(changes) > 1 { - return datastore.NoRevision, spiceerrors.MustBugf("unexpected MemDB transaction with multiple revision changes") - } else if len(changes) == 1 { - rc = changes[0] - } + if !isFirstChange { + return datastore.NoRevision, spiceerrors.MustBugf("unexpected MemDB transaction with multiple revision changes") + } - change := &changelog{ - revisionNanos: newRevision.TimestampNanoSec(), - changes: rc, - } - if err := tx.Insert(tableChangelog, change); err != nil { - return datastore.NoRevision, fmt.Errorf("error writing changelog: %w", err) + change := &changelog{ + revisionNanos: newRevision.TimestampNanoSec(), + changes: rc, + } + if err := tx.Insert(tableChangelog, change); err != nil { + return datastore.NoRevision, fmt.Errorf("error writing changelog: %w", err) + } + + isFirstChange = false } tx.Commit() diff --git a/internal/datastore/mysql/watch.go b/internal/datastore/mysql/watch.go index 66ac7f079..119809ab9 100644 --- a/internal/datastore/mysql/watch.go +++ b/internal/datastore/mysql/watch.go @@ -3,6 +3,7 @@ package mysql import ( "context" "errors" + "iter" "time" sq "github.com/Masterminds/squirrel" @@ -86,9 +87,7 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi currentTxn := afterRevision.TransactionID() for { - var stagedUpdates []datastore.RevisionChanges - var err error - stagedUpdates, currentTxn, err = mds.loadChanges(ctx, currentTxn, options) + stagedUpdates, ctxn, err := mds.loadChanges(ctx, currentTxn, options) if err != nil { if errors.Is(ctx.Err(), context.Canceled) { errs <- datastore.NewWatchCanceledErr() @@ -97,17 +96,25 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi } return } + currentTxn = ctxn // Write the staged updates to the channel - for _, changeToWrite := range stagedUpdates { + changeCount := 0 + for changeToWrite, err := range stagedUpdates { + if err != nil { + errs <- err + return + } + changeToWrite := changeToWrite if !sendChange(changeToWrite) { return } + changeCount++ } // If there were no changes, sleep a bit - if len(stagedUpdates) == 0 { + if changeCount == 0 { sleep := time.NewTimer(watchSleep) select { @@ -128,14 +135,14 @@ func (mds *Datastore) loadChanges( ctx context.Context, afterRevision uint64, options datastore.WatchOptions, -) (changes []datastore.RevisionChanges, newRevision uint64, err error) { - newRevision, err = mds.loadRevision(ctx) +) (iter.Seq2[datastore.RevisionChanges, error], uint64, error) { + newRevision, err := mds.loadRevision(ctx) if err != nil { - return + return nil, 0, err } if newRevision == afterRevision { - return + return func(yield func(datastore.RevisionChanges, error) bool) {}, newRevision, nil } stagedChanges := common.NewChanges(revisions.TransactionIDKeyFunc, options.Content, options.MaximumBufferedChangesByteSize) @@ -148,7 +155,7 @@ func (mds *Datastore) loadChanges( }, }).ToSql() if err != nil { - return + return nil, 0, err } rows, err := mds.db.QueryContext(ctx, sql, args...) @@ -160,7 +167,7 @@ func (mds *Datastore) loadChanges( } else if common.IsResettableError(err) { err = datastore.NewWatchTemporaryErr(err) } - return + return nil, 0, err } defer common.LogOnError(ctx, rows.Close) @@ -183,7 +190,7 @@ func (mds *Datastore) loadChanges( } rows.Close() if rows.Err() != nil { - return + return nil, 0, err } // Load the changes relationships for the revision range. @@ -198,7 +205,7 @@ func (mds *Datastore) loadChanges( }, }).ToSql() if err != nil { - return + return nil, 0, err } rows, err = mds.db.QueryContext(ctx, sql, args...) @@ -206,7 +213,7 @@ func (mds *Datastore) loadChanges( if errors.Is(err, context.Canceled) { err = datastore.NewWatchCanceledErr() } - return + return nil, 0, err } defer common.LogOnError(ctx, rows.Close) @@ -236,7 +243,7 @@ func (mds *Datastore) loadChanges( &deletedTxn, ) if err != nil { - return + return nil, 0, err } relationship := tuple.Relationship{ @@ -257,25 +264,24 @@ func (mds *Datastore) loadChanges( relationship.OptionalCaveat, err = common.ContextualizedCaveatFrom(caveatName, caveatContext) if err != nil { - return + return nil, 0, err } if createdTxn > afterRevision && createdTxn <= newRevision { if err = stagedChanges.AddRelationshipChange(ctx, revisions.NewForTransactionID(createdTxn), relationship, tuple.UpdateOperationTouch); err != nil { - return + return nil, 0, err } } if deletedTxn > afterRevision && deletedTxn <= newRevision { if err = stagedChanges.AddRelationshipChange(ctx, revisions.NewForTransactionID(deletedTxn), relationship, tuple.UpdateOperationDelete); err != nil { - return + return nil, 0, err } } } - if err = rows.Err(); err != nil { - return + if err := rows.Err(); err != nil { + return nil, 0, err } - changes, err = stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc) - return + return stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc), newRevision, nil } diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 0d577b819..7f640e92c 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "iter" "time" sq "github.com/Masterminds/squirrel" @@ -155,8 +156,12 @@ func (pgd *pgDatastore) Watch( return } - for _, changeToWrite := range changesToWrite { - changeToWrite := changeToWrite + for changeToWrite, err := range changesToWrite { + if err != nil { + errs <- err + return + } + if !sendChange(changeToWrite) { return } @@ -243,7 +248,7 @@ func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRev return ids, nil } -func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRevision, options datastore.WatchOptions) ([]datastore.RevisionChanges, error) { +func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRevision, options datastore.WatchOptions) (iter.Seq2[datastore.RevisionChanges, error], error) { xmin := revisions[0].optionalTxID.Uint64 xmax := revisions[0].optionalTxID.Uint64 filter := make(map[uint64]int, len(revisions)) @@ -295,7 +300,7 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRev // Reconcile the changes. return tracked.AsRevisionChanges(func(lhs, rhs uint64) bool { return filter[lhs] < filter[rhs] - }) + }), nil } func (pgd *pgDatastore) loadRelationshipChanges(ctx context.Context, xmin uint64, xmax uint64, txidToRevision map[uint64]postgresRevision, filter map[uint64]int, tracked *common.Changes[postgresRevision, uint64]) error { diff --git a/internal/datastore/spanner/watch.go b/internal/datastore/spanner/watch.go index ee4467a7d..89c31e7c7 100644 --- a/internal/datastore/spanner/watch.go +++ b/internal/datastore/spanner/watch.go @@ -369,13 +369,13 @@ func (sd *spannerDatastore) watch( } if !tracked.IsEmpty() { - changes, err := tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc) - if err != nil { - return err - } + changes := tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc) + for revChange, err := range changes { + if err != nil { + sendError(err) + return err + } - for _, revChange := range changes { - revChange := revChange if !sendChange(revChange) { return datastore.NewWatchDisconnectedErr() }