diff --git a/server/storage/mvcc/kv_test.go b/server/storage/mvcc/kv_test.go index 71a9953f743..c727b444af7 100644 --- a/server/storage/mvcc/kv_test.go +++ b/server/storage/mvcc/kv_test.go @@ -757,7 +757,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index ad17b2be7ac..22d46e5048d 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -70,12 +70,18 @@ type watchableStore struct { wg sync.WaitGroup } +var _ WatchableKV = (*watchableStore)(nil) + // cancelFunc updates unsynced and synced maps when running // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV { - return newWatchableStore(lg, b, le, cfg) +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore { + s := newWatchableStore(lg, b, le, cfg) + s.wg.Add(2) + go s.syncWatchersLoop() + go s.syncVictimsLoop() + return s } func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore { @@ -95,9 +101,6 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S // use this store as the deleter so revokes trigger watch events s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } - s.wg.Add(2) - go s.syncWatchersLoop() - go s.syncVictimsLoop() return s } diff --git a/server/storage/mvcc/watchable_store_bench_test.go b/server/storage/mvcc/watchable_store_bench_test.go index ba402a3e13e..c8990576b30 100644 --- a/server/storage/mvcc/watchable_store_bench_test.go +++ b/server/storage/mvcc/watchable_store_bench_test.go @@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) { func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { be, _ := betesting.NewDefaultTmpBackend(b) - s := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be) k := []byte("testkey") @@ -122,21 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { // we should put to simulate the real-world use cases. func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - s := NewStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) - - // manually create watchableStore instead of newWatchableStore - // because newWatchableStore periodically calls syncWatchersLoop - // method to sync watchers in unsynced map. We want to keep watchers - // in unsynced for this benchmark. - ws := &watchableStore{ - store: s, - unsynced: newWatcherGroup(), - - // to make the test not crash from assigning to nil map. - // 'synced' doesn't get populated in this test. - synced: newWatcherGroup(), - stopc: make(chan struct{}), - } + ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(ws, be) @@ -146,7 +132,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { // and force watchers to be in unsynced. testKey := []byte("foo") testValue := []byte("bar") - s.Put(testKey, testValue, lease.NoLease) + ws.Put(testKey, testValue, lease.NoLease) w := ws.NewWatchStream() defer w.Close() @@ -178,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - s := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, be) diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index 35bbe2535a7..c15bd12326f 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -15,13 +15,13 @@ package mvcc import ( - "bytes" "fmt" "reflect" "sync" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -33,7 +33,7 @@ import ( func TestWatch(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -52,7 +52,7 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -81,16 +81,7 @@ func TestCancelUnsynced(t *testing.T) { // because newWatchableStore automatically calls syncWatchers // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. - s := &watchableStore{ - store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}), - unsynced: newWatcherGroup(), - - // to make the test not crash from assigning to nil map. - // 'synced' doesn't get populated in this test. - synced: newWatcherGroup(), - stopc: make(chan struct{}), - } - + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) // Put a key so that we can spawn watchers on that key. @@ -134,91 +125,49 @@ func TestCancelUnsynced(t *testing.T) { // and moves these watchers to synced. func TestSyncWatchers(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - - s := &watchableStore{ - store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}), - unsynced: newWatcherGroup(), - synced: newWatcherGroup(), - stopc: make(chan struct{}), - } - + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) - w := s.NewWatchStream() defer w.Close() - - // arbitrary number for watchers watcherN := 100 - for i := 0; i < watcherN; i++ { - // specify rev as 1 to keep watchers in unsynced - w.Watch(0, testKey, nil, 1) - } - - // Before running s.syncWatchers() synced should be empty because we manually - // populate unsynced only - sws := s.synced.watcherSetByKey(string(testKey)) - uws := s.unsynced.watcherSetByKey(string(testKey)) - - if len(sws) != 0 { - t.Fatalf("synced[string(testKey)] size = %d, want 0", len(sws)) - } - // unsynced should not be empty because we manually populated unsynced only - if len(uws) != watcherN { - t.Errorf("unsynced size = %d, want %d", len(uws), watcherN) + _, err := w.Watch(0, testKey, nil, 1) + require.NoError(t, err) } - // this should move all unsynced watchers to synced ones + assert.Empty(t, s.synced.watcherSetByKey(string(testKey))) + assert.Len(t, s.unsynced.watcherSetByKey(string(testKey)), watcherN) s.syncWatchers() + assert.Len(t, s.synced.watcherSetByKey(string(testKey)), watcherN) + assert.Empty(t, s.unsynced.watcherSetByKey(string(testKey))) - sws = s.synced.watcherSetByKey(string(testKey)) - uws = s.unsynced.watcherSetByKey(string(testKey)) - - // After running s.syncWatchers(), synced should not be empty because syncwatchers - // populates synced in this test case - if len(sws) != watcherN { - t.Errorf("synced[string(testKey)] size = %d, want %d", len(sws), watcherN) - } - - // unsynced should be empty because syncwatchers is expected to move all watchers - // from unsynced to synced in this test case - if len(uws) != 0 { - t.Errorf("unsynced size = %d, want 0", len(uws)) - } - - for w := range sws { - if w.minRev != s.Rev()+1 { - t.Errorf("w.minRev = %d, want %d", w.minRev, s.Rev()+1) - } - } - - if len(w.(*watchStream).ch) != watcherN { - t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN) - } - - evs := (<-w.(*watchStream).ch).Events - if len(evs) != 1 { - t.Errorf("len(evs) got = %d, want = 1", len(evs)) - } - if evs[0].Type != mvccpb.PUT { - t.Errorf("got = %v, want = %v", evs[0].Type, mvccpb.PUT) - } - if !bytes.Equal(evs[0].Kv.Key, testKey) { - t.Errorf("got = %s, want = %s", evs[0].Kv.Key, testKey) - } - if !bytes.Equal(evs[0].Kv.Value, testValue) { - t.Errorf("got = %s, want = %s", evs[0].Kv.Value, testValue) + require.Len(t, w.(*watchStream).ch, watcherN) + for i := 0; i < watcherN; i++ { + events := (<-w.(*watchStream).ch).Events + assert.Len(t, events, 1) + assert.Equal(t, []mvccpb.Event{ + { + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Key: testKey, + CreateRevision: 2, + ModRevision: 2, + Version: 1, + Value: testValue, + }, + }, + }, events) } } // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -256,7 +205,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) lg := zaptest.NewLogger(t) - s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{}) + s := New(lg, b, &lease.FakeLessor{}, StoreConfig{}) defer func() { cleanup(s, b) @@ -310,7 +259,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -349,7 +298,7 @@ func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) testKey := []byte("foo") @@ -395,11 +344,11 @@ func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) { // 5. choose the watcher from step 1, without panic func TestWatchRestoreSyncedWatcher(t *testing.T) { b1, _ := betesting.NewDefaultTmpBackend(t) - s1 := newWatchableStore(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, StoreConfig{}) + s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s1, b1) b2, _ := betesting.NewDefaultTmpBackend(t) - s2 := newWatchableStore(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, StoreConfig{}) + s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s2, b2) testKey, testValue := []byte("foo"), []byte("bar") @@ -447,36 +396,77 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - oldMaxRevs := watchBatchMaxRevs - defer func() { - watchBatchMaxRevs = oldMaxRevs - cleanup(s, b) - }() - batches := 3 - watchBatchMaxRevs = 4 - - v := []byte("foo") - for i := 0; i < watchBatchMaxRevs*batches; i++ { - s.Put(v, v, lease.NoLease) + tcs := []struct { + name string + revisions int + watchBatchMaxRevs int + eventsPerRevision int + expectRevisionBatches [][]int64 + }{ + { + name: "3 revisions, 4 revs per batch, 1 events per revision", + revisions: 12, + watchBatchMaxRevs: 4, + eventsPerRevision: 1, + expectRevisionBatches: [][]int64{ + {2, 3, 4, 5}, + {6, 7, 8, 9}, + {10, 11, 12, 13}, + }, + }, + { + name: "3 revisions, 4 revs per batch, 3 events per revision", + revisions: 12, + watchBatchMaxRevs: 4, + eventsPerRevision: 3, + expectRevisionBatches: [][]int64{ + {2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5}, + {6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9}, + {10, 10, 10, 11, 11, 11, 12, 12, 12, 13, 13, 13}, + }, + }, } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + oldMaxRevs := watchBatchMaxRevs + defer func() { + watchBatchMaxRevs = oldMaxRevs + cleanup(s, b) + }() + watchBatchMaxRevs = tc.watchBatchMaxRevs - w := s.NewWatchStream() - defer w.Close() + v := []byte("foo") + for i := 0; i < tc.revisions; i++ { + txn := s.Write(traceutil.TODO()) + for j := 0; j < tc.eventsPerRevision; j++ { + txn.Put(v, v, lease.NoLease) + } + txn.End() + } - w.Watch(0, v, nil, 1) - for i := 0; i < batches; i++ { - if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs { - t.Fatalf("len(events) = %d, want %d", len(resp.Events), watchBatchMaxRevs) - } - } + w := s.NewWatchStream() + defer w.Close() + + w.Watch(0, v, nil, 1) + var revisionBatches [][]int64 + eventCount := 0 + for eventCount < tc.revisions*tc.eventsPerRevision { + var revisions []int64 + for _, e := range (<-w.Chan()).Events { + revisions = append(revisions, e.Kv.ModRevision) + eventCount++ + } + revisionBatches = append(revisionBatches, revisions) + } + assert.Equal(t, tc.expectRevisionBatches, revisionBatches) - s.store.revMu.Lock() - defer s.store.revMu.Unlock() - if size := s.synced.size(); size != 1 { - t.Errorf("synced size = %d, want 1", size) + s.store.revMu.Lock() + defer s.store.revMu.Unlock() + assert.Equal(t, 1, s.synced.size()) + assert.Equal(t, 0, s.unsynced.size()) + }) } } @@ -583,7 +573,7 @@ func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { cleanup(s, b) @@ -660,7 +650,7 @@ func TestWatchVictims(t *testing.T) { // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) testKey, testValue := []byte("foo"), []byte("bar") diff --git a/server/storage/mvcc/watcher_bench_test.go b/server/storage/mvcc/watcher_bench_test.go index 52a55d0632b..3d0dccea342 100644 --- a/server/storage/mvcc/watcher_bench_test.go +++ b/server/storage/mvcc/watcher_bench_test.go @@ -26,7 +26,7 @@ import ( func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, _ := betesting.NewDefaultTmpBackend(b) - watchable := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) + watchable := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(watchable, be) diff --git a/server/storage/mvcc/watcher_test.go b/server/storage/mvcc/watcher_test.go index a53253a08c0..0c1fa521267 100644 --- a/server/storage/mvcc/watcher_test.go +++ b/server/storage/mvcc/watcher_test.go @@ -35,7 +35,7 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -85,7 +85,7 @@ func TestWatcherWatchID(t *testing.T) { func TestWatcherRequestsCustomID(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -122,7 +122,7 @@ func TestWatcherRequestsCustomID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -196,7 +196,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -216,7 +216,7 @@ func TestWatcherWatchWrongRange(t *testing.T) { func TestWatchDeleteRange(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) - s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { b.Close() @@ -256,7 +256,7 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream() @@ -293,17 +293,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) { // report its correct progress. func TestWatcherRequestProgress(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - - // manually create watchableStore instead of newWatchableStore - // because newWatchableStore automatically calls syncWatchers - // method to sync watchers in unsynced map. We want to keep watchers - // in unsynced to test if syncWatchers works as expected. - s := &watchableStore{ - store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}), - unsynced: newWatcherGroup(), - synced: newWatcherGroup(), - stopc: make(chan struct{}), - } + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) @@ -346,17 +336,7 @@ func TestWatcherRequestProgress(t *testing.T) { func TestWatcherRequestProgressAll(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - - // manually create watchableStore instead of newWatchableStore - // because newWatchableStore automatically calls syncWatchers - // method to sync watchers in unsynced map. We want to keep watchers - // in unsynced to test if syncWatchers works as expected. - s := &watchableStore{ - store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}), - unsynced: newWatcherGroup(), - synced: newWatcherGroup(), - stopc: make(chan struct{}), - } + s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) @@ -395,7 +375,7 @@ func TestWatcherRequestProgressAll(t *testing.T) { func TestWatcherWatchWithFilter(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) - s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) + s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer cleanup(s, b) w := s.NewWatchStream()