Skip to content

Commit

Permalink
Reuse events between sync loops
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Dec 2, 2024
1 parent b52ee3b commit 900accd
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 62 deletions.
53 changes: 37 additions & 16 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (s *watchableStore) syncWatchersLoop() {
waitDuration := 100 * time.Millisecond
delayTicker := time.NewTicker(waitDuration)
defer delayTicker.Stop()
var evs []mvccpb.Event

for {
s.mu.RLock()
Expand All @@ -230,7 +231,7 @@ func (s *watchableStore) syncWatchersLoop() {

unsyncedWatchers := 0
if lastUnsyncedWatchers > 0 {
unsyncedWatchers = s.syncWatchers()
unsyncedWatchers, evs = s.syncWatchers(evs)
}
syncDuration := time.Since(st)

Expand Down Expand Up @@ -339,12 +340,12 @@ func (s *watchableStore) moveVictims() (moved int) {
// 2. iterate over the set to get the minimum revision and remove compacted watchers
// 3. use minimum revision to get all key-value pairs and send those events to watchers
// 4. remove synced watchers in set from unsynced group and move to synced group
func (s *watchableStore) syncWatchers() int {
func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) {
s.mu.Lock()
defer s.mu.Unlock()

if s.unsynced.size() == 0 {
return 0
return 0, []mvccpb.Event{}

Check warning on line 348 in server/storage/mvcc/watchable_store.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/watchable_store.go#L348

Added line #L348 was not covered by tests
}

s.store.revMu.RLock()
Expand All @@ -357,7 +358,7 @@ func (s *watchableStore) syncWatchers() int {
compactionRev := s.store.compactMainRev

wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
evs := rangeEvents(s.store.lg, s.store.b, minRev, curRev+1, wg)
evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1)

victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
Expand Down Expand Up @@ -406,11 +407,39 @@ func (s *watchableStore) syncWatchers() int {
}
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))

return s.unsynced.size()
return s.unsynced.size(), evs
}

// rangeEventsWithReuse returns events in range [minRev, maxRev), while reusing already provided events.
func rangeEventsWithReuse(lg *zap.Logger, b backend.Backend, evs []mvccpb.Event, minRev, maxRev int64) []mvccpb.Event {
if len(evs) == 0 {
return rangeEvents(lg, b, minRev, maxRev)
}
if evs[0].Kv.ModRevision > minRev {
evs = append(rangeEvents(lg, b, minRev, evs[0].Kv.ModRevision), evs...)
}
prefixIndex := 0
for prefixIndex < len(evs) && evs[prefixIndex].Kv.ModRevision < minRev {
prefixIndex++
}
evs = evs[prefixIndex:]

if len(evs) == 0 {
return rangeEvents(lg, b, minRev, maxRev)
}
if evs[len(evs)-1].Kv.ModRevision+1 < maxRev {
evs = append(evs, rangeEvents(lg, b, evs[len(evs)-1].Kv.ModRevision+1, maxRev)...)
}
suffixIndex := len(evs) - 1
for suffixIndex >= 0 && evs[suffixIndex].Kv.ModRevision >= maxRev {
suffixIndex--
}
evs = evs[:suffixIndex+1]
return evs
}

// rangeEvents returns events in range [minRev, maxRev).
func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64, c contains) []mvccpb.Event {
func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64) []mvccpb.Event {
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes)
Expand All @@ -420,30 +449,22 @@ func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64, c cont
tx := b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
evs := kvsToEvents(lg, c, revs, vs)
evs := kvsToEvents(lg, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
return evs
}

type contains interface {
contains(string) bool
}

// kvsToEvents gets all events for the watchers from all key-value pairs
func kvsToEvents(lg *zap.Logger, c contains, revs, vals [][]byte) (evs []mvccpb.Event) {
func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) {
for i, v := range vals {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil {
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
}

if !c.contains(string(kv.Key)) {
continue
}

ty := mvccpb.PUT
if isTombstone(revs[i]) {
ty = mvccpb.DELETE
Expand Down
98 changes: 54 additions & 44 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestSyncWatchers(t *testing.T) {

assert.Empty(t, s.synced.watcherSetByKey(string(testKey)))
assert.Len(t, s.unsynced.watcherSetByKey(string(testKey)), watcherN)
s.syncWatchers()
s.syncWatchers([]mvccpb.Event{})
assert.Len(t, s.synced.watcherSetByKey(string(testKey)), watcherN)
assert.Empty(t, s.unsynced.watcherSetByKey(string(testKey)))

Expand Down Expand Up @@ -181,48 +181,58 @@ func TestRangeEvents(t *testing.T) {
s.DeleteRange(foo1, foo3) // Deletes "foo1" and "foo2" generating 2 events

var evs []mvccpb.Event
evs = rangeEvents(lg, b, 2, 6, fakeContains{})
assert.Len(t, evs, 5)
evs = rangeEvents(lg, b, 2, 5, fakeContains{})
assert.Len(t, evs, 3)
evs = rangeEvents(lg, b, 2, 4, fakeContains{})
assert.Len(t, evs, 2)
evs = rangeEvents(lg, b, 2, 3, fakeContains{})
assert.Len(t, evs, 1)
evs = rangeEvents(lg, b, 2, 2, fakeContains{})
assert.Empty(t, evs)

evs = rangeEvents(lg, b, 3, 6, fakeContains{})
assert.Len(t, evs, 4)
evs = rangeEvents(lg, b, 4, 6, fakeContains{})
assert.Len(t, evs, 3)
evs = rangeEvents(lg, b, 5, 6, fakeContains{})
assert.Len(t, evs, 2)
evs = rangeEvents(lg, b, 6, 6, fakeContains{})
assert.Empty(t, evs)

evs = rangeEvents(lg, b, 2, 3, fakeContains{})
assert.Len(t, evs, 1)
evs = rangeEvents(lg, b, 2, 4, fakeContains{})
assert.Len(t, evs, 2)
evs = rangeEvents(lg, b, 3, 4, fakeContains{})
assert.Len(t, evs, 1)
evs = rangeEvents(lg, b, 3, 5, fakeContains{})
assert.Len(t, evs, 2)
evs = rangeEvents(lg, b, 4, 5, fakeContains{})
assert.Len(t, evs, 1)
evs = rangeEvents(lg, b, 4, 6, fakeContains{})
assert.Len(t, evs, 3)
evs = rangeEvents(lg, b, 5, 6, fakeContains{})
assert.Len(t, evs, 2)
evs = rangeEvents(lg, b, 6, 6, fakeContains{})
assert.Empty(t, evs)
}

type fakeContains struct{}

func (f fakeContains) contains(string) bool {
return true
tcs := []struct {
rangeFunc func(minRev, maxRev int64) []mvccpb.Event
}{
{
rangeFunc: func(minRev, maxRev int64) []mvccpb.Event {
return rangeEvents(lg, b, minRev, maxRev)
},
},
{
rangeFunc: func(minRev, maxRev int64) []mvccpb.Event {
return rangeEventsWithReuse(lg, b, evs, minRev, maxRev)
},
},
}
for _, tc := range tcs {
evs = tc.rangeFunc(2, 6)
assert.Len(t, evs, 5)
evs = tc.rangeFunc(2, 5)
assert.Len(t, evs, 3)
evs = tc.rangeFunc(2, 4)
assert.Len(t, evs, 2)
evs = tc.rangeFunc(2, 3)
assert.Len(t, evs, 1)
evs = tc.rangeFunc(2, 2)
assert.Empty(t, evs)

evs = tc.rangeFunc(3, 6)
assert.Len(t, evs, 4)
evs = tc.rangeFunc(4, 6)
assert.Len(t, evs, 3)
evs = tc.rangeFunc(5, 6)
assert.Len(t, evs, 2)
evs = tc.rangeFunc(6, 6)
assert.Empty(t, evs)

evs = tc.rangeFunc(2, 3)
assert.Len(t, evs, 1)
evs = tc.rangeFunc(2, 4)
assert.Len(t, evs, 2)
evs = tc.rangeFunc(3, 4)
assert.Len(t, evs, 1)
evs = tc.rangeFunc(3, 5)
assert.Len(t, evs, 2)
evs = tc.rangeFunc(4, 5)
assert.Len(t, evs, 1)
evs = tc.rangeFunc(4, 6)
assert.Len(t, evs, 3)
evs = tc.rangeFunc(5, 6)
assert.Len(t, evs, 2)
evs = tc.rangeFunc(6, 6)
assert.Empty(t, evs)
}
}

// TestWatchCompacted tests a watcher that watches on a compacted revision.
Expand Down Expand Up @@ -297,7 +307,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) {
require.NoError(t, err)
}
// fill up w.Chan() with 1 buf via 2 compacted watch response
s.syncWatchers()
s.syncWatchers([]mvccpb.Event{})

for len(watchers) > 0 {
resp := <-w.Chan()
Expand Down
4 changes: 2 additions & 2 deletions server/storage/mvcc/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestWatcherRequestProgress(t *testing.T) {
default:
}

s.syncWatchers()
s.syncWatchers([]mvccpb.Event{})

w.RequestProgress(id)
wrs := WatchResponse{WatchID: id, Revision: 2}
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestWatcherRequestProgressAll(t *testing.T) {
default:
}

s.syncWatchers()
s.syncWatchers([]mvccpb.Event{})

w.RequestProgressAll()
wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}
Expand Down

0 comments on commit 900accd

Please sign in to comment.