From 71b56dd9a5b705372db5c98ab3d698dd82f0ce09 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 29 Nov 2024 17:22:10 +0100 Subject: [PATCH] Reuse events between sync loops Signed-off-by: Marek Siarkowicz --- server/storage/mvcc/watchable_store.go | 44 +++++++++++++++------ server/storage/mvcc/watchable_store_test.go | 4 +- server/storage/mvcc/watcher_test.go | 4 +- 3 files changed, 36 insertions(+), 16 deletions(-) diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index f76225eb8702..0d9f1eb8b161 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -218,6 +218,7 @@ func (s *watchableStore) syncWatchersLoop() { waitDuration := 100 * time.Millisecond delayTicker := time.NewTicker(waitDuration) defer delayTicker.Stop() + var evs []mvccpb.Event for { s.mu.RLock() @@ -227,7 +228,7 @@ func (s *watchableStore) syncWatchersLoop() { unsyncedWatchers := 0 if lastUnsyncedWatchers > 0 { - unsyncedWatchers = s.syncWatchers() + unsyncedWatchers, evs = s.syncWatchers(evs) } syncDuration := time.Since(st) @@ -336,12 +337,12 @@ func (s *watchableStore) moveVictims() (moved int) { // 2. iterate over the set to get the minimum revision and remove compacted watchers // 3. use minimum revision to get all key-value pairs and send those events to watchers // 4. remove synced watchers in set from unsynced group and move to synced group -func (s *watchableStore) syncWatchers() int { +func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) { s.mu.Lock() defer s.mu.Unlock() if s.unsynced.size() == 0 { - return 0 + return 0, []mvccpb.Event{} } s.store.revMu.RLock() @@ -354,7 +355,7 @@ func (s *watchableStore) syncWatchers() int { compactionRev := s.store.compactMainRev wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) - evs := s.rangeEvents(minRev, curRev+1, wg) + evs = s.rangeEventsWithReuse(evs, minRev, curRev+1) victims := make(watcherBatch) wb := newWatcherBatch(wg, evs) @@ -403,11 +404,34 @@ func (s *watchableStore) syncWatchers() int { } slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) - return s.unsynced.size() + return s.unsynced.size(), evs +} + +// rangeEventsWithReuse returns events in range [minRev, maxRev), while reusing already provided events. +func (s *watchableStore) rangeEventsWithReuse(evs []mvccpb.Event, minRev, maxRev int64) []mvccpb.Event { + if len(evs) == 0 { + return s.rangeEvents(minRev, maxRev) + } + if evs[0].Kv.ModRevision > minRev { + evs = append(s.rangeEvents(minRev, evs[0].Kv.ModRevision), evs...) + } + prefixIndex := 0 + for prefixIndex < len(evs) && evs[prefixIndex].Kv.ModRevision < minRev { + prefixIndex++ + } + evs = evs[prefixIndex:] + + if len(evs) == 0 { + return s.rangeEvents(minRev, maxRev) + } + if evs[len(evs)-1].Kv.ModRevision+1 < maxRev { + evs = append(evs, s.rangeEvents(evs[len(evs)-1].Kv.ModRevision+1, maxRev)...) + } + return evs } // rangeEvents returns events in range [minRev, maxRev). -func (s *watchableStore) rangeEvents(minRev, maxRev int64, wg *watcherGroup) []mvccpb.Event { +func (s *watchableStore) rangeEvents(minRev, maxRev int64) []mvccpb.Event { minBytes, maxBytes := NewRevBytes(), NewRevBytes() minBytes = RevToBytes(Revision{Main: minRev}, minBytes) maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes) @@ -417,7 +441,7 @@ func (s *watchableStore) rangeEvents(minRev, maxRev int64, wg *watcherGroup) []m tx := s.store.b.ReadTx() tx.RLock() revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) - evs := kvsToEvents(s.store.lg, wg, revs, vs) + evs := kvsToEvents(s.store.lg, revs, vs) // Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy. // We can only unlock after Unmarshal, which will do deep copy. // Otherwise we will trigger SIGSEGV during boltdb re-mmap. @@ -426,17 +450,13 @@ func (s *watchableStore) rangeEvents(minRev, maxRev int64, wg *watcherGroup) []m } // kvsToEvents gets all events for the watchers from all key-value pairs -func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { +func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) { for i, v := range vals { var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) } - if !wg.contains(string(kv.Key)) { - continue - } - ty := mvccpb.PUT if isTombstone(revs[i]) { ty = mvccpb.DELETE diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index 35bbe2535a7f..51f5d7dabf66 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -173,7 +173,7 @@ func TestSyncWatchers(t *testing.T) { } // this should move all unsynced watchers to synced ones - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) sws = s.synced.watcherSetByKey(string(testKey)) uws = s.unsynced.watcherSetByKey(string(testKey)) @@ -287,7 +287,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) { require.NoError(t, err) } // fill up w.Chan() with 1 buf via 2 compacted watch response - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) for len(watchers) > 0 { resp := <-w.Chan() diff --git a/server/storage/mvcc/watcher_test.go b/server/storage/mvcc/watcher_test.go index a53253a08c02..07619ce2f63e 100644 --- a/server/storage/mvcc/watcher_test.go +++ b/server/storage/mvcc/watcher_test.go @@ -330,7 +330,7 @@ func TestWatcherRequestProgress(t *testing.T) { default: } - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) w.RequestProgress(id) wrs := WatchResponse{WatchID: id, Revision: 2} @@ -379,7 +379,7 @@ func TestWatcherRequestProgressAll(t *testing.T) { default: } - s.syncWatchers() + s.syncWatchers([]mvccpb.Event{}) w.RequestProgressAll() wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}