Skip to content

Commit

Permalink
update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Nov 2, 2024
1 parent 93fcaa1 commit 77b44d1
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 1 deletion.
6 changes: 6 additions & 0 deletions storage/operation/badgerimpl/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage
}
}

// First seeks to the smallest key greater than or equal to the given key.
func (i *badgerIterator) First() {
i.iter.Seek(i.lowerBound)
}

// Valid returns whether the iterator is positioned at a valid key-value pair.
func (i *badgerIterator) Valid() bool {
// if it's beyond the upper bound, it's invalid
if !i.iter.Valid() {
Expand All @@ -49,16 +51,20 @@ func (i *badgerIterator) Valid() bool {
return valid
}

// Next advances the iterator to the next key-value pair.
func (i *badgerIterator) Next() {
i.iter.Next()
}

// IterItem returns the current key-value pair, or nil if done.
func (i *badgerIterator) IterItem() storage.IterItem {
return i.iter.Item()
}

var _ storage.IterItem = (*badger.Item)(nil)

// Close closes the iterator. Iterator must be closed, otherwise it causes memory leak.
// No errors expected during normal operation
func (i *badgerIterator) Close() error {
i.iter.Close()
return nil
Expand Down
13 changes: 13 additions & 0 deletions storage/operation/badgerimpl/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ var _ io.Closer = (*noopCloser)(nil)

func (noopCloser) Close() error { return nil }

// Get gets the value for the given key. It returns ErrNotFound if the DB
// does not contain the key.
// other errors are exceptions
//
// The caller should not modify the contents of the returned slice, but it is
// safe to modify the contents of the argument after Get returns. The
// returned slice will remain valid until the returned Closer is closed. On
// success, the caller MUST call closer.Close() or a memory leak will occur.
func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) {
tx := b.db.NewTransaction(false)
defer tx.Discard()
Expand All @@ -40,6 +48,11 @@ func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) {
return value, noopCloser{}, nil
}

// NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive.
// Specifically, all keys that meet ANY of the following conditions are included in the iteration:
// - have a prefix equal to startPrefix OR
// - have a prefix equal to the endPrefix OR
// - have a prefix that is lexicographically between startPrefix and endPrefix
func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) {
return newBadgerIterator(b.db, startPrefix, endPrefix, ops), nil
}
Expand Down
27 changes: 27 additions & 0 deletions storage/operation/badgerimpl/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,36 @@ type ReaderBatchWriter struct {

var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil)

// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation").
// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed.
// This reader may observe different values for the same key on subsequent reads.
func (b *ReaderBatchWriter) GlobalReader() storage.Reader {
return b.globalReader
}

// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed.
// When we `Write` into the batch, that write operation is added to the pending batch, but not committed.
// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are.
// Note:
// - The writer cannot be used concurrently for writing.
func (b *ReaderBatchWriter) Writer() storage.Writer {
return b
}

// BadgerWriteBatch returns the badger write batch
func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch {
return b.batch
}

// AddCallback adds a callback to execute after the batch has been flush
// regardless the batch update is succeeded or failed.
// The error parameter is the error returned by the batch update.
func (b *ReaderBatchWriter) AddCallback(callback func(error)) {
b.callbacks.AddCallback(callback)
}

// Commit flushes the batch to the database.
// No errors expected during normal operation
func (b *ReaderBatchWriter) Commit() error {
err := b.batch.Flush()

Expand Down Expand Up @@ -69,14 +83,27 @@ func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter {

var _ storage.Writer = (*ReaderBatchWriter)(nil)

// Set sets the value for the given key. It overwrites any previous value
// for that key; a DB is not a multi-map.
//
// It is safe to modify the contents of the arguments after Set returns.
// No errors expected during normal operation
func (b *ReaderBatchWriter) Set(key, value []byte) error {
return b.batch.Set(key, value)
}

// Delete deletes the value for the given key. Deletes are blind all will
// succeed even if the given key does not exist.
//
// It is safe to modify the contents of the arguments after Delete returns.
// No errors expected during normal operation
func (b *ReaderBatchWriter) Delete(key []byte) error {
return b.batch.Delete(key)
}

// DeleteByRange removes all keys with a prefix that falls within the
// range [start, end], both inclusive.
// No errors expected during normal operation
func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error {
err := operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error {
err := b.batch.Delete(key)
Expand Down
6 changes: 6 additions & 0 deletions storage/operation/pebbleimpl/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,22 @@ func newPebbleIterator(reader pebble.Reader, startPrefix, endPrefix []byte, ops
}, nil
}

// First seeks to the smallest key greater than or equal to the given key.
func (i *pebbleIterator) First() {
i.iter.First()
}

// Valid returns whether the iterator is positioned at a valid key-value pair.
func (i *pebbleIterator) Valid() bool {
return i.iter.Valid()
}

// Next advances the iterator to the next key-value pair.
func (i *pebbleIterator) Next() {
i.iter.Next()
}

// IterItem returns the current key-value pair, or nil if done.
func (i *pebbleIterator) IterItem() storage.IterItem {
return pebbleIterItem{iter: i.iter}
}
Expand All @@ -67,6 +71,8 @@ func (i pebbleIterItem) Value(fn func([]byte) error) error {
return fn(val)
}

// Close closes the iterator. Iterator must be closed, otherwise it causes memory leak.
// No errors expected during normal operation
func (i *pebbleIterator) Close() error {
return i.iter.Close()
}
13 changes: 13 additions & 0 deletions storage/operation/pebbleimpl/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ var _ io.Closer = (*noopCloser)(nil)

func (noopCloser) Close() error { return nil }

// Get gets the value for the given key. It returns ErrNotFound if the DB
// does not contain the key.
// other errors are exceptions
//
// The caller should not modify the contents of the returned slice, but it is
// safe to modify the contents of the argument after Get returns. The
// returned slice will remain valid until the returned Closer is closed. On
// success, the caller MUST call closer.Close() or a memory leak will occur.
func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) {
value, closer, err := b.db.Get(key)

Expand All @@ -37,6 +45,11 @@ func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) {
return value, closer, nil
}

// NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive.
// Specifically, all keys that meet ANY of the following conditions are included in the iteration:
// - have a prefix equal to startPrefix OR
// - have a prefix equal to the endPrefix OR
// - have a prefix that is lexicographically between startPrefix and endPrefix
func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) {
return newPebbleIterator(b.db, startPrefix, endPrefix, ops)
}
Expand Down
27 changes: 26 additions & 1 deletion storage/operation/pebbleimpl/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ type ReaderBatchWriter struct {

var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil)

// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation").
// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed.
// This reader may observe different values for the same key on subsequent reads.
func (b *ReaderBatchWriter) GlobalReader() storage.Reader {
return b.globalReader
}

// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed.
// When we `Write` into the batch, that write operation is added to the pending batch, but not committed.
// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are.
// Note:
// - The writer cannot be used concurrently for writing.
func (b *ReaderBatchWriter) Writer() storage.Writer {
return b
}
Expand All @@ -28,10 +36,15 @@ func (b *ReaderBatchWriter) PebbleWriterBatch() *pebble.Batch {
return b.batch
}

// AddCallback adds a callback to execute after the batch has been flush
// regardless the batch update is succeeded or failed.
// The error parameter is the error returned by the batch update.
func (b *ReaderBatchWriter) AddCallback(callback func(error)) {
b.callbacks.AddCallback(callback)
}

// Commit flushes the batch to the database.
// No errors expected during normal operation
func (b *ReaderBatchWriter) Commit() error {
err := b.batch.Commit(pebble.Sync)

Expand Down Expand Up @@ -66,15 +79,27 @@ func NewReaderBatchWriter(db *pebble.DB) *ReaderBatchWriter {

var _ storage.Writer = (*ReaderBatchWriter)(nil)

// Set sets the value for the given key. It overwrites any previous value
// for that key; a DB is not a multi-map.
//
// It is safe to modify the contents of the arguments after Set returns.
// No errors expected during normal operation
func (b *ReaderBatchWriter) Set(key, value []byte) error {
return b.batch.Set(key, value, pebble.Sync)
}

// Delete deletes the value for the given key. Deletes are blind all will
// succeed even if the given key does not exist.
//
// It is safe to modify the contents of the arguments after Delete returns.
// No errors expected during normal operation
func (b *ReaderBatchWriter) Delete(key []byte) error {
return b.batch.Delete(key, pebble.Sync)
}

// DeleteByRange deletes all keys with a prefix in the range [startPrefix, endPrefix] (both inclusive).
// DeleteByRange removes all keys with a prefix that falls within the
// range [start, end], both inclusive.
// No errors expected during normal operation
func (b *ReaderBatchWriter) DeleteByRange(_ storage.Reader, startPrefix, endPrefix []byte) error {
// DeleteRange takes the prefix range with start (inclusive) and end (exclusive, note: not inclusive).
// therefore, we need to increment the endPrefix to make it inclusive.
Expand Down
1 change: 1 addition & 0 deletions storage/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Reader interface {
}

// Writer is an interface for batch writing to a storage backend.
// It cannot be used concurrently for writing.
type Writer interface {
// Set sets the value for the given key. It overwrites any previous value
// for that key; a DB is not a multi-map.
Expand Down

0 comments on commit 77b44d1

Please sign in to comment.