Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse events used for syncing watchers #17563

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 58 additions & 23 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@
waitDuration := 100 * time.Millisecond
delayTicker := time.NewTicker(waitDuration)
defer delayTicker.Stop()
var evs []mvccpb.Event

for {
s.mu.RLock()
Expand All @@ -230,7 +231,7 @@

unsyncedWatchers := 0
if lastUnsyncedWatchers > 0 {
unsyncedWatchers = s.syncWatchers()
unsyncedWatchers, evs = s.syncWatchers(evs)
}
syncDuration := time.Since(st)

Expand Down Expand Up @@ -339,12 +340,12 @@
// 2. iterate over the set to get the minimum revision and remove compacted watchers
// 3. use minimum revision to get all key-value pairs and send those events to watchers
// 4. remove synced watchers in set from unsynced group and move to synced group
func (s *watchableStore) syncWatchers() int {
func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) {
s.mu.Lock()
defer s.mu.Unlock()

if s.unsynced.size() == 0 {
return 0
return 0, []mvccpb.Event{}

Check warning on line 348 in server/storage/mvcc/watchable_store.go

View check run for this annotation

Codecov / codecov/patch

server/storage/mvcc/watchable_store.go#L348

Added line #L348 was not covered by tests
}

s.store.revMu.RLock()
Expand All @@ -357,20 +358,7 @@
compactionRev := s.store.compactMainRev

wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes)

// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
evs := kvsToEvents(s.store.lg, wg, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1)

victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
Expand Down Expand Up @@ -419,21 +407,68 @@
}
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))

return s.unsynced.size()
return s.unsynced.size(), evs
}

// rangeEventsWithReuse returns events in range [minRev, maxRev), while reusing already provided events.
func rangeEventsWithReuse(lg *zap.Logger, b backend.Backend, evs []mvccpb.Event, minRev, maxRev int64) []mvccpb.Event {
if len(evs) == 0 {
return rangeEvents(lg, b, minRev, maxRev)
}
// append from left
if evs[0].Kv.ModRevision > minRev {
evs = append(rangeEvents(lg, b, minRev, evs[0].Kv.ModRevision), evs...)
}
// cut from left
prefixIndex := 0
for prefixIndex < len(evs) && evs[prefixIndex].Kv.ModRevision < minRev {
prefixIndex++
}
evs = evs[prefixIndex:]

if len(evs) == 0 {
return rangeEvents(lg, b, minRev, maxRev)
}
// append from right
if evs[len(evs)-1].Kv.ModRevision+1 < maxRev {
evs = append(evs, rangeEvents(lg, b, evs[len(evs)-1].Kv.ModRevision+1, maxRev)...)
}
// cut from right
suffixIndex := len(evs) - 1
for suffixIndex >= 0 && evs[suffixIndex].Kv.ModRevision >= maxRev {
suffixIndex--
}
evs = evs[:suffixIndex+1]
return evs
}

// rangeEvents returns events in range [minRev, maxRev).
func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64) []mvccpb.Event {
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes)

// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
evs := kvsToEvents(lg, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
return evs
}

// kvsToEvents gets all events for the watchers from all key-value pairs
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) {
for i, v := range vals {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil {
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
}

if !wg.contains(string(kv.Key)) {
continue
}

ty := mvccpb.PUT
if isTombstone(revs[i]) {
ty = mvccpb.DELETE
Expand Down
108 changes: 106 additions & 2 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestSyncWatchers(t *testing.T) {

assert.Empty(t, s.synced.watcherSetByKey(string(testKey)))
assert.Len(t, s.unsynced.watcherSetByKey(string(testKey)), watcherN)
s.syncWatchers()
s.syncWatchers([]mvccpb.Event{})
assert.Len(t, s.synced.watcherSetByKey(string(testKey)), watcherN)
assert.Empty(t, s.unsynced.watcherSetByKey(string(testKey)))

Expand All @@ -164,6 +164,110 @@ func TestSyncWatchers(t *testing.T) {
}
}

func TestRangeEvents(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
lg := zaptest.NewLogger(t)
s := NewStore(lg, b, &lease.FakeLessor{}, StoreConfig{})

defer cleanup(s, b)

foo1 := []byte("foo1")
foo2 := []byte("foo2")
foo3 := []byte("foo3")
value := []byte("bar")
s.Put(foo1, value, lease.NoLease)
s.Put(foo2, value, lease.NoLease)
s.Put(foo3, value, lease.NoLease)
s.DeleteRange(foo1, foo3) // Deletes "foo1" and "foo2" generating 2 events

expectEvents := []mvccpb.Event{
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Key: foo1,
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Value: value,
},
},
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Key: foo2,
CreateRevision: 3,
ModRevision: 3,
Version: 1,
Value: value,
},
},
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Key: foo3,
CreateRevision: 4,
ModRevision: 4,
Version: 1,
Value: value,
},
},
{
Type: mvccpb.DELETE,
Kv: &mvccpb.KeyValue{
Key: foo1,
ModRevision: 5,
},
},
{
Type: mvccpb.DELETE,
Kv: &mvccpb.KeyValue{
Key: foo2,
ModRevision: 5,
},
},
}

tcs := []struct {
minRev int64
maxRev int64
expectEvents []mvccpb.Event
}{
// maxRev, top to bottom
{minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]},
{minRev: 2, maxRev: 5, expectEvents: expectEvents[0:3]},
{minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]},
{minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]},
{minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]},

// minRev, bottom to top
{minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]},
{minRev: 3, maxRev: 6, expectEvents: expectEvents[1:5]},
{minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]},
{minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]},
{minRev: 6, maxRev: 6, expectEvents: expectEvents[0:0]},

// Moving window algorithm, first increase maxRev, then increase minRev, repeat.
{minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]},
{minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]},
{minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]},
Comment on lines +251 to +252
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two cases are duplicated? see line 238-239. Better to sort the cases by minRev or maxRev?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's intentional. We group multiple cases into a scenarios. Added comments to make them easier to see. This is to better test the reuse, as the function call has side effects with modifying evs.

{minRev: 3, maxRev: 4, expectEvents: expectEvents[1:2]},
{minRev: 3, maxRev: 5, expectEvents: expectEvents[1:3]},
{minRev: 4, maxRev: 5, expectEvents: expectEvents[2:3]},
{minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]},
{minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]},
{minRev: 6, maxRev: 6, expectEvents: expectEvents[5:5]},
}
// reuse the evs to test rangeEventsWithReuse
var evs []mvccpb.Event
for i, tc := range tcs {
t.Run(fmt.Sprintf("%d rangeEvents(%d, %d)", i, tc.minRev, tc.maxRev), func(t *testing.T) {
assert.ElementsMatch(t, tc.expectEvents, rangeEvents(lg, b, tc.minRev, tc.maxRev))
evs = rangeEventsWithReuse(lg, b, evs, tc.minRev, tc.maxRev)
assert.ElementsMatch(t, tc.expectEvents, evs)
})
}
}

// TestWatchCompacted tests a watcher that watches on a compacted revision.
func TestWatchCompacted(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
Expand Down Expand Up @@ -236,7 +340,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) {
require.NoError(t, err)
}
// fill up w.Chan() with 1 buf via 2 compacted watch response
s.syncWatchers()
s.syncWatchers([]mvccpb.Event{})

for len(watchers) > 0 {
resp := <-w.Chan()
Expand Down
4 changes: 2 additions & 2 deletions server/storage/mvcc/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestWatcherRequestProgress(t *testing.T) {
default:
}

s.syncWatchers()
s.syncWatchers([]mvccpb.Event{})

w.RequestProgress(id)
wrs := WatchResponse{WatchID: id, Revision: 2}
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestWatcherRequestProgressAll(t *testing.T) {
default:
}

s.syncWatchers()
s.syncWatchers([]mvccpb.Event{})

w.RequestProgressAll()
wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}
Expand Down
Loading