Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit f31ff23

Browse files
committedJan 27, 2025·
opt(stream): add option to directly copy over tables from lower levels (#1700)
Also takes a bug fix from PR #1712, commit 58d0674 This PR adds FullCopy option in Stream. This allows sending the table entirely to the writer. If this option is set to true we directly copy over the tables from the last 2 levels. This option increases the stream speed while also lowering the memory consumption on the DB that is streaming the KVs. For 71GB, compressed and encrypted DB we observed 3x improvement in speed. The DB contained ~65GB in the last 2 levels while remaining in the above levels. To use this option, the following options should be set in Stream. stream.KeyToList = nil stream.ChooseKey = nil stream.SinceTs = 0 db.managedTxns = true If we use stream writer for receiving the KVs, the encryption mode has to be the same in sender and receiver. This will restrict db.StreamDB() to use the same encryption mode in both input and output DB. Added TODO for allowing different encryption modes.
1 parent 3ceff58 commit f31ff23

13 files changed

+728
-189
lines changed
 

‎db.go

+1
Original file line numberDiff line numberDiff line change
@@ -2005,6 +2005,7 @@ func (db *DB) StreamDB(outOptions Options) error {
20052005
// Stream contents of DB to the output DB.
20062006
stream := db.NewStreamAt(math.MaxUint64)
20072007
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
2008+
stream.FullCopy = true
20082009

20092010
stream.Send = func(buf *z.Buffer) error {
20102011
return writer.Write(buf)

‎iterator.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -366,17 +366,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
366366
// that the tables are sorted in the right order.
367367
func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
368368
filterTables := func(tables []*table.Table) []*table.Table {
369-
if opt.SinceTs > 0 {
370-
tmp := tables[:0]
371-
for _, t := range tables {
372-
if t.MaxVersion() < opt.SinceTs {
373-
continue
374-
}
375-
tmp = append(tmp, t)
369+
if opt.SinceTs == 0 {
370+
return tables
371+
}
372+
out := tables[:0]
373+
for _, t := range tables {
374+
if t.MaxVersion() < opt.SinceTs {
375+
continue
376376
}
377-
tables = tmp
377+
out = append(out, t)
378378
}
379-
return tables
379+
return out
380380
}
381381

382382
if len(opt.Prefix) == 0 {
@@ -491,7 +491,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
491491
for i := 0; i < len(tables); i++ {
492492
iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
493493
}
494-
iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
494+
iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references.
495495
res := &Iterator{
496496
txn: txn,
497497
iitr: table.NewMergeIterator(iters, opt.Reverse),

‎key_registry.go

+50-36
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"sync"
2929
"time"
3030

31+
"github.com/pkg/errors"
32+
3133
"github.com/dgraph-io/badger/v4/pb"
3234
"github.com/dgraph-io/badger/v4/y"
3335
"google.golang.org/protobuf/proto"
@@ -265,7 +267,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error {
265267
// Write all the datakeys to the buf.
266268
for _, k := range reg.dataKeys {
267269
// Writing the datakey to the given buffer.
268-
if err := storeDataKey(buf, opt.EncryptionKey, k); err != nil {
270+
if err := storeDataKey(buf, opt.EncryptionKey, *k); err != nil {
269271
return y.Wrapf(err, "Error while storing datakey in WriteKeyRegistry")
270272
}
271273
}
@@ -339,44 +341,58 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) {
339341
defer kr.Unlock()
340342
// Key might have generated by another go routine. So,
341343
// checking once again.
342-
key, valid = validKey()
343-
if valid {
344+
if key, valid := validKey(); valid {
344345
return key, nil
345346
}
346347
k := make([]byte, len(kr.opt.EncryptionKey))
347348
iv, err := y.GenerateIV()
348349
if err != nil {
349350
return nil, err
350351
}
351-
_, err = rand.Read(k)
352-
if err != nil {
352+
353+
if _, err := rand.Read(k); err != nil {
353354
return nil, err
354355
}
355356
// Otherwise Increment the KeyID and generate new datakey.
356357
kr.nextKeyID++
357-
dk := &pb.DataKey{
358+
dk := pb.DataKey{
358359
KeyId: kr.nextKeyID,
359360
Data: k,
360361
CreatedAt: time.Now().Unix(),
361362
Iv: iv,
362363
}
364+
kr.lastCreated = dk.CreatedAt
365+
kr.dataKeys[kr.nextKeyID] = &dk
363366
// Don't store the datakey on file if badger is running in InMemory mode.
364-
if !kr.opt.InMemory {
365-
// Store the datekey.
366-
buf := &bytes.Buffer{}
367-
if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil {
368-
return nil, err
369-
}
370-
// Persist the datakey to the disk
371-
if _, err = kr.fp.Write(buf.Bytes()); err != nil {
372-
return nil, err
373-
}
367+
if kr.opt.InMemory {
368+
return &dk, nil
369+
374370
}
375-
// storeDatakey encrypts the datakey So, placing un-encrypted key in the memory.
376-
dk.Data = k
377-
kr.lastCreated = dk.CreatedAt
378-
kr.dataKeys[kr.nextKeyID] = dk
379-
return dk, nil
371+
// Store the datekey.
372+
if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil {
373+
return nil, err
374+
}
375+
return &dk, nil
376+
}
377+
378+
func (kr *KeyRegistry) AddKey(dk pb.DataKey) (uint64, error) {
379+
// If we don't have a encryption key, we cannot store the datakey.
380+
if len(kr.opt.EncryptionKey) == 0 {
381+
return 0, errors.New("No encryption key found. Cannot add data key")
382+
}
383+
384+
if _, ok := kr.dataKeys[dk.KeyId]; !ok {
385+
// If KeyId does not exists already, then use the next available KeyId to store data key.
386+
kr.nextKeyID++
387+
dk.KeyId = kr.nextKeyID
388+
}
389+
kr.dataKeys[dk.KeyId] = &dk
390+
391+
if kr.opt.InMemory {
392+
return dk.KeyId, nil
393+
}
394+
// Store the datakey.
395+
return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk)
380396
}
381397

382398
// Close closes the key registry.
@@ -388,38 +404,36 @@ func (kr *KeyRegistry) Close() error {
388404
}
389405

390406
// storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset.
391-
func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error {
407+
// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field.
408+
func storeDataKey(w io.Writer, storageKey []byte, key pb.DataKey) error {
392409
// xor will encrypt the IV and xor with the given data.
393410
// It'll used for both encryption and decryption.
394411
xor := func() error {
395412
if len(storageKey) == 0 {
396413
return nil
397414
}
398415
var err error
399-
k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv)
416+
key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv)
400417
return err
401418
}
402419
// In memory datakey will be plain text so encrypting before storing to the disk.
403-
var err error
404-
if err = xor(); err != nil {
420+
if err := xor(); err != nil {
405421
return y.Wrapf(err, "Error while encrypting datakey in storeDataKey")
406422
}
407-
var data []byte
408-
if data, err = proto.Marshal(k); err != nil {
423+
424+
data, err := key.Marshal()
425+
if err != nil {
409426
err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey")
410-
var err2 error
411-
// decrypting the datakey back.
412-
if err2 = xor(); err2 != nil {
413-
return y.Wrapf(err,
414-
y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error())
427+
if err2 := xor(); err2 != nil {
428+
return y.Wrapf(err, y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error())
415429
}
416430
return err
417431
}
432+
418433
var lenCrcBuf [8]byte
419434
binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data)))
420435
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable))
421-
y.Check2(buf.Write(lenCrcBuf[:]))
422-
y.Check2(buf.Write(data))
423-
// Decrypting the datakey back since we're using the pointer.
424-
return xor()
436+
y.Check2(w.Write(lenCrcBuf[:]))
437+
y.Check2(w.Write(data))
438+
return nil
425439
}

‎level_handler.go

+31-5
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
304304
return maxVs, decr()
305305
}
306306

307-
// appendIterators appends iterators to an array of iterators, for merging.
307+
// iterators returns an array of iterators, for merging.
308308
// Note: This obtains references for the table handlers. Remember to close these iterators.
309-
func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
309+
func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator {
310310
s.RLock()
311311
defer s.RUnlock()
312312

@@ -324,14 +324,40 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions)
324324
out = append(out, t)
325325
}
326326
}
327-
return appendIteratorsReversed(iters, out, topt)
327+
return iteratorsReversed(out, topt)
328328
}
329329

330330
tables := opt.pickTables(s.tables)
331331
if len(tables) == 0 {
332-
return iters
332+
return nil
333333
}
334-
return append(iters, table.NewConcatIterator(tables, topt))
334+
return []y.Iterator{table.NewConcatIterator(tables, topt)}
335+
}
336+
337+
func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table {
338+
if opt.Reverse {
339+
panic("Invalid option for getTables")
340+
}
341+
342+
s.RLock()
343+
defer s.RUnlock()
344+
345+
if s.level == 0 {
346+
var out []*table.Table
347+
for _, t := range s.tables {
348+
if opt.pickTable(t) {
349+
t.IncrRef()
350+
out = append(out, t)
351+
}
352+
}
353+
return out
354+
}
355+
356+
tables := opt.pickTables(s.tables)
357+
for _, t := range tables {
358+
t.IncrRef()
359+
}
360+
return tables
335361
}
336362

337363
type levelHandlerRLocked struct{}

‎levels.go

+61-7
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/pkg/errors"
3535
otrace "go.opencensus.io/trace"
3636

37+
"github.com/dgraph-io/badger/v4/options"
3738
"github.com/dgraph-io/badger/v4/pb"
3839
"github.com/dgraph-io/badger/v4/table"
3940
"github.com/dgraph-io/badger/v4/y"
@@ -909,7 +910,7 @@ func (s *levelsController) compactBuildTables(
909910
var iters []y.Iterator
910911
switch {
911912
case lev == 0:
912-
iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
913+
iters = append(iters, iteratorsReversed(topTables, table.NOCACHE)...)
913914
case len(topTables) > 0:
914915
y.AssertTrue(len(topTables) == 1)
915916
iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
@@ -1642,24 +1643,34 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int)
16421643
return maxVs, nil
16431644
}
16441645

1645-
func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
1646+
func iteratorsReversed(th []*table.Table, opt int) []y.Iterator {
1647+
out := make([]y.Iterator, 0, len(th))
16461648
for i := len(th) - 1; i >= 0; i-- {
16471649
// This will increment the reference of the table handler.
16481650
out = append(out, th[i].NewIterator(opt))
16491651
}
16501652
return out
16511653
}
16521654

1653-
// appendIterators appends iterators to an array of iterators, for merging.
1655+
// getTables return tables from all levels. It would call IncrRef on all returned tables.
1656+
func (s *levelsController) getTables(opt *IteratorOptions) [][]*table.Table {
1657+
res := make([][]*table.Table, 0, len(s.levels))
1658+
for _, level := range s.levels {
1659+
res = append(res, level.getTables(opt))
1660+
}
1661+
return res
1662+
}
1663+
1664+
// iterators returns an array of iterators, for merging.
16541665
// Note: This obtains references for the table handlers. Remember to close these iterators.
1655-
func (s *levelsController) appendIterators(
1656-
iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
1666+
func (s *levelsController) iterators(opt *IteratorOptions) []y.Iterator {
16571667
// Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
16581668
// data when there's a compaction.
1669+
itrs := make([]y.Iterator, 0, len(s.levels))
16591670
for _, level := range s.levels {
1660-
iters = level.appendIterators(iters, opt)
1671+
itrs = append(itrs, level.iterators(opt)...)
16611672
}
1662-
return iters
1673+
return itrs
16631674
}
16641675

16651676
// TableInfo represents the information about a table.
@@ -1786,3 +1797,46 @@ func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
17861797
sort.Strings(splits)
17871798
return splits
17881799
}
1800+
1801+
// AddTable builds the table from the KV.value options passed through the KV.Key.
1802+
func (lc *levelsController) AddTable(
1803+
kv *pb.KV, lev int, dk *pb.DataKey, change *pb.ManifestChange) error {
1804+
// TODO: Encryption / Decryption might be required for the table, if the sender and receiver
1805+
// don't have same encryption mode. See if inplace encryption/decryption can be done.
1806+
// Tables are sent in the sorted order, so no need to sort them here.
1807+
encrypted := len(lc.kv.opt.EncryptionKey) > 0
1808+
y.AssertTrue((dk != nil && encrypted) || (dk == nil && !encrypted))
1809+
// The keyId is zero if there is no encryption.
1810+
opts := buildTableOptions(lc.kv)
1811+
opts.Compression = options.CompressionType(change.Compression)
1812+
opts.DataKey = dk
1813+
1814+
fileID := lc.reserveFileID()
1815+
fname := table.NewFilename(fileID, lc.kv.opt.Dir)
1816+
1817+
// kv.Value is owned by the z.buffer. Ensure that we copy this buffer.
1818+
var tbl *table.Table
1819+
var err error
1820+
if lc.kv.opt.InMemory {
1821+
if tbl, err = table.OpenInMemoryTable(y.Copy(kv.Value), fileID, &opts); err != nil {
1822+
return errors.Wrap(err, "while creating in-memory table from buffer")
1823+
}
1824+
} else {
1825+
if tbl, err = table.CreateTableFromBuffer(fname, kv.Value, opts); err != nil {
1826+
return errors.Wrap(err, "while creating table from buffer")
1827+
}
1828+
}
1829+
1830+
lc.levels[lev].addTable(tbl)
1831+
// Release the ref held by OpenTable. addTable would add a reference.
1832+
_ = tbl.DecrRef()
1833+
1834+
change.Id = fileID
1835+
change.Level = uint32(lev)
1836+
if dk != nil {
1837+
change.KeyId = dk.KeyId
1838+
}
1839+
// We use the same data KeyId. So, change.KeyId remains the same.
1840+
y.AssertTrue(change.Op == pb.ManifestChange_CREATE)
1841+
return lc.kv.manifest.addChanges([]*pb.ManifestChange{change})
1842+
}

0 commit comments

Comments
 (0)
Please sign in to comment.