Skip to content

Commit 77973c6

Browse files
committed
setup oracle correctly in stream writer
1 parent fe5b808 commit 77973c6

File tree

3 files changed

+54
-2
lines changed

3 files changed

+54
-2
lines changed

Diff for: levels.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1573,8 +1573,8 @@ func (s *levelsController) close() error {
15731573
}
15741574

15751575
// get searches for a given key in all the levels of the LSM tree. It returns
1576-
// key version <= the expected version (maxVs). If not found, it returns an empty
1577-
// y.ValueStruct.
1576+
// key version <= the expected version (version in key). If not found,
1577+
// it returns an empty y.ValueStruct.
15781578
func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) (
15791579
y.ValueStruct, error) {
15801580
if s.kv.IsClosed() {

Diff for: stream_writer.go

+5
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,11 @@ func (sw *StreamWriter) Flush() error {
282282
if sw.db.orc != nil {
283283
sw.db.orc.Stop()
284284
}
285+
286+
if curMax := sw.db.orc.readTs(); curMax >= sw.maxVersion {
287+
sw.maxVersion = curMax
288+
}
289+
285290
sw.db.orc = newOracle(sw.db.opt)
286291
sw.db.orc.nextTxnTs = sw.maxVersion
287292
sw.db.orc.txnMark.Done(sw.maxVersion)

Diff for: stream_writer_test.go

+47
Original file line numberDiff line numberDiff line change
@@ -750,4 +750,51 @@ func TestStreamWriterIncremental(t *testing.T) {
750750
require.NoError(t, err)
751751
})
752752
})
753+
754+
t.Run("multiple incremental with older data first", func(t *testing.T) {
755+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
756+
buf := z.NewBuffer(10<<20, "test")
757+
defer func() { require.NoError(t, buf.Release()) }()
758+
KVToBuffer(&pb.KV{
759+
Key: []byte("a1"),
760+
Value: []byte("val1"),
761+
Version: 11,
762+
}, buf)
763+
sw := db.NewStreamWriter()
764+
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
765+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
766+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
767+
768+
buf = z.NewBuffer(10<<20, "test")
769+
defer func() { require.NoError(t, buf.Release()) }()
770+
KVToBuffer(&pb.KV{
771+
Key: []byte("a2"),
772+
Value: []byte("val2"),
773+
Version: 9,
774+
}, buf)
775+
sw = db.NewStreamWriter()
776+
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
777+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
778+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
779+
780+
// This will move the maxTs to 10 (earlier, without the fix)
781+
require.NoError(t, db.Update(func(txn *Txn) error {
782+
return txn.Set([]byte("a1"), []byte("val3"))
783+
}))
784+
// This will move the maxTs to 11 (earliler, without the fix)
785+
require.NoError(t, db.Update(func(txn *Txn) error {
786+
return txn.Set([]byte("a3"), []byte("val4"))
787+
}))
788+
789+
// And now, the first write with val1 will resurface (without the fix)
790+
require.NoError(t, db.View(func(txn *Txn) error {
791+
item, err := txn.Get([]byte("a1"))
792+
require.NoError(t, err)
793+
val, err := item.ValueCopy(nil)
794+
require.NoError(t, err)
795+
require.Equal(t, "val3", string(val))
796+
return nil
797+
}))
798+
})
799+
})
753800
}

0 commit comments

Comments
 (0)