From 680aa9439ac6a647614338d55ba2a016a8d5e193 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Tue, 20 Aug 2024 16:02:28 -0700 Subject: [PATCH] insert approvals with badger batch update --- storage/badger/approvals.go | 84 +++++----- storage/badger/cache_b.go | 152 ++++++++++++++++++ storage/badger/operation/approvals.go | 22 +-- storage/badger/operation/common.go | 40 +++++ .../badger/operation/reader_batch_writer.go | 130 +++++++++++++++ storage/batch.go | 76 ++++++++- 6 files changed, 453 insertions(+), 51 deletions(-) create mode 100644 storage/badger/cache_b.go create mode 100644 storage/badger/operation/reader_batch_writer.go diff --git a/storage/badger/approvals.go b/storage/badger/approvals.go index eb3cf4ae820..56aab0a7f8e 100644 --- a/storage/badger/approvals.go +++ b/storage/badger/approvals.go @@ -3,6 +3,7 @@ package badger import ( "errors" "fmt" + "sync" "github.com/dgraph-io/badger/v2" @@ -11,24 +12,24 @@ import ( "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger/operation" - "github.com/onflow/flow-go/storage/badger/transaction" ) // ResultApprovals implements persistent storage for result approvals. type ResultApprovals struct { - db *badger.DB - cache *Cache[flow.Identifier, *flow.ResultApproval] + db *badger.DB + cache *CacheB[flow.Identifier, *flow.ResultApproval] + indexing *sync.Mutex // preventing concurrent indexing of approvals } func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApprovals { - store := func(key flow.Identifier, val *flow.ResultApproval) func(*transaction.Tx) error { - return transaction.WithTx(operation.SkipDuplicates(operation.InsertResultApproval(val))) + store := func(key flow.Identifier, val *flow.ResultApproval) func(storage.BadgerReaderBatchWriter) error { + return storage.OnlyBadgerWriter(operation.InsertResultApproval(val)) } - retrieve := func(approvalID flow.Identifier) func(tx *badger.Txn) (*flow.ResultApproval, error) { + retrieve := func(approvalID flow.Identifier) func(tx storage.Reader) (*flow.ResultApproval, error) { var approval flow.ResultApproval - return func(tx *badger.Txn) (*flow.ResultApproval, error) { + return func(tx storage.Reader) (*flow.ResultApproval, error) { err := operation.RetrieveResultApproval(approvalID, &approval)(tx) return &approval, err } @@ -36,21 +37,22 @@ func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApp res := &ResultApprovals{ db: db, - cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals, - withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100), - withStore[flow.Identifier, *flow.ResultApproval](store), - withRetrieve[flow.Identifier, *flow.ResultApproval](retrieve)), + cache: newCacheB[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals, + withLimitB[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100), + withStoreB[flow.Identifier, *flow.ResultApproval](store), + withRetrieveB[flow.Identifier, *flow.ResultApproval](retrieve)), + indexing: new(sync.Mutex), } return res } -func (r *ResultApprovals) store(approval *flow.ResultApproval) func(*transaction.Tx) error { +func (r *ResultApprovals) store(approval *flow.ResultApproval) func(storage.BadgerReaderBatchWriter) error { return r.cache.PutTx(approval.ID(), approval) } -func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*flow.ResultApproval, error) { - return func(tx *badger.Txn) (*flow.ResultApproval, error) { +func (r *ResultApprovals) byID(approvalID flow.Identifier) func(storage.Reader) (*flow.ResultApproval, error) { + return func(tx storage.Reader) (*flow.ResultApproval, error) { val, err := r.cache.Get(approvalID)(tx) if err != nil { return nil, err @@ -59,8 +61,8 @@ func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*f } } -func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(*badger.Txn) (*flow.ResultApproval, error) { - return func(tx *badger.Txn) (*flow.ResultApproval, error) { +func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(storage.Reader) (*flow.ResultApproval, error) { + return func(tx storage.Reader) (*flow.ResultApproval, error) { var approvalID flow.Identifier err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(tx) if err != nil { @@ -70,29 +72,27 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f } } -func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { - return func(tx *badger.Txn) error { - err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(tx) - if err == nil { - return nil - } +func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.BadgerReaderBatchWriter) error { + return func(tx storage.BadgerReaderBatchWriter) error { + r, w := tx.ReaderWriter() + + var storedApprovalID flow.Identifier + err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("could not lookup result approval ID: %w", err) + } - if !errors.Is(err, storage.ErrAlreadyExists) { - return err + // no approval found, index the approval + + return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w) } - // When trying to index an approval for a result, and there is already - // an approval for the result, double check if the indexed approval is - // the same. + // an approval is already indexed, double check if it is the same // We don't allow indexing multiple approvals per chunk because the // store is only used within Verification nodes, and it is impossible // for a Verification node to compute different approvals for the same // chunk. - var storedApprovalID flow.Identifier - err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(tx) - if err != nil { - return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err) - } if storedApprovalID != approvalID { return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w", @@ -105,14 +105,22 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app // Store stores a ResultApproval func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { - return operation.RetryOnConflictTx(r.db, transaction.Update, r.store(approval)) + return operation.WithBadgerReaderBatchWriter(r.db, r.store(approval)) } // Index indexes a ResultApproval by chunk (ResultID + chunk index). // operation is idempotent (repeated calls with the same value are equivalent to // just calling the method once; still the method succeeds on each call). func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error { - err := operation.RetryOnConflict(r.db.Update, r.index(resultID, chunkIndex, approvalID)) + // acquring the lock to prevent dirty reads when checking conflicted approvals + // how it works: + // the lock can only be acquired after the index operation is committed to the database, + // since the index operation is the only operation that would affect the reads operation, + // no writes can go through util the lock is released, so locking here could prevent dirty reads. + r.indexing.Lock() + defer r.indexing.Unlock() + + err := operation.WithBadgerReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID)) if err != nil { return fmt.Errorf("could not index result approval: %w", err) } @@ -121,16 +129,12 @@ func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, app // ByID retrieves a ResultApproval by its ID func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error) { - tx := r.db.NewTransaction(false) - defer tx.Discard() - return r.byID(approvalID)(tx) + return r.byID(approvalID)(operation.ToReader(r.db)) } // ByChunk retrieves a ResultApproval by result ID and chunk index. The // ResultApprovals store is only used within a verification node, where it is // assumed that there is never more than one approval per chunk. func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) { - tx := r.db.NewTransaction(false) - defer tx.Discard() - return r.byChunk(resultID, chunkIndex)(tx) + return r.byChunk(resultID, chunkIndex)(operation.ToReader(r.db)) } diff --git a/storage/badger/cache_b.go b/storage/badger/cache_b.go new file mode 100644 index 00000000000..4c7ed3a97d5 --- /dev/null +++ b/storage/badger/cache_b.go @@ -0,0 +1,152 @@ +package badger + +import ( + "errors" + "fmt" + + lru "github.com/hashicorp/golang-lru/v2" + + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +func withLimitB[K comparable, V any](limit uint) func(*CacheB[K, V]) { + return func(c *CacheB[K, V]) { + c.limit = limit + } +} + +type storeFuncB[K comparable, V any] func(key K, val V) func(storage.BadgerReaderBatchWriter) error + +func withStoreB[K comparable, V any](store storeFuncB[K, V]) func(*CacheB[K, V]) { + return func(c *CacheB[K, V]) { + c.store = store + } +} + +func noStoreB[K comparable, V any](_ K, _ V) func(storage.BadgerReaderBatchWriter) error { + return func(tx storage.BadgerReaderBatchWriter) error { + return fmt.Errorf("no store function for cache put available") + } +} + +// nolint: unused +func noopStoreB[K comparable, V any](_ K, _ V) func(storage.BadgerReaderBatchWriter) error { + return func(tx storage.BadgerReaderBatchWriter) error { + return nil + } +} + +type retrieveFuncB[K comparable, V any] func(key K) func(storage.Reader) (V, error) + +func withRetrieveB[K comparable, V any](retrieve retrieveFuncB[K, V]) func(*CacheB[K, V]) { + return func(c *CacheB[K, V]) { + c.retrieve = retrieve + } +} + +func noRetrieveB[K comparable, V any](_ K) func(storage.Reader) (V, error) { + return func(tx storage.Reader) (V, error) { + var nullV V + return nullV, fmt.Errorf("no retrieve function for cache get available") + } +} + +type CacheB[K comparable, V any] struct { + metrics module.CacheMetrics + limit uint + store storeFuncB[K, V] + retrieve retrieveFuncB[K, V] + resource string + cache *lru.Cache[K, V] +} + +func newCacheB[K comparable, V any](collector module.CacheMetrics, resourceName string, options ...func(*CacheB[K, V])) *CacheB[K, V] { + c := CacheB[K, V]{ + metrics: collector, + limit: 1000, + store: noStoreB[K, V], + retrieve: noRetrieveB[K, V], + resource: resourceName, + } + for _, option := range options { + option(&c) + } + c.cache, _ = lru.New[K, V](int(c.limit)) + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + return &c +} + +// IsCached returns true if the key exists in the cache. +// It DOES NOT check whether the key exists in the underlying data store. +func (c *CacheB[K, V]) IsCached(key K) bool { + return c.cache.Contains(key) +} + +// Get will try to retrieve the resource from cache first, and then from the +// injected. During normal operations, the following error returns are expected: +// - `storage.ErrNotFound` if key is unknown. +func (c *CacheB[K, V]) Get(key K) func(storage.Reader) (V, error) { + return func(tx storage.Reader) (V, error) { + + // check if we have it in the cache + resource, cached := c.cache.Get(key) + if cached { + c.metrics.CacheHit(c.resource) + return resource, nil + } + + // get it from the database + resource, err := c.retrieve(key)(tx) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + c.metrics.CacheNotFound(c.resource) + } + var nullV V + return nullV, fmt.Errorf("could not retrieve resource: %w", err) + } + + c.metrics.CacheMiss(c.resource) + + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } + + return resource, nil + } +} + +func (c *CacheB[K, V]) Remove(key K) { + c.cache.Remove(key) +} + +// Insert will add a resource directly to the cache with the given ID +func (c *CacheB[K, V]) Insert(key K, resource V) { + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } +} + +// PutTx will return tx which adds a resource to the cache with the given ID. +func (c *CacheB[K, V]) PutTx(key K, resource V) func(storage.BadgerReaderBatchWriter) error { + storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution) + + return func(tx storage.BadgerReaderBatchWriter) error { + tx.AddCallback(func(err error) { + if err != nil { + c.Insert(key, resource) + } + }) + + err := storeOps(tx) // execute operations to store resource + if err != nil { + return fmt.Errorf("could not store resource: %w", err) + } + + return nil + } +} diff --git a/storage/badger/operation/approvals.go b/storage/badger/operation/approvals.go index 8a994eed2a2..af64911df7e 100644 --- a/storage/badger/operation/approvals.go +++ b/storage/badger/operation/approvals.go @@ -1,19 +1,21 @@ package operation import ( - "github.com/dgraph-io/badger/v2" - "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" ) // InsertResultApproval inserts a ResultApproval by ID. -func InsertResultApproval(approval *flow.ResultApproval) func(*badger.Txn) error { - return insert(makePrefix(codeResultApproval, approval.ID()), approval) +// The same key (`approval.ID()`) necessitates that the value (full `approval`) is +// also identical (otherwise, we would have a successful pre-image attack on our +// cryptographic hash function). Therefore, concurrent calls to this function are safe. +func InsertResultApproval(approval *flow.ResultApproval) func(storage.Writer) error { + return insertW(makePrefix(codeResultApproval, approval.ID()), approval) } // RetrieveResultApproval retrieves an approval by ID. -func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(*badger.Txn) error { - return retrieve(makePrefix(codeResultApproval, approvalID), approval) +func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(storage.Reader) error { + return retrieveR(makePrefix(codeResultApproval, approvalID), approval) } // IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID @@ -21,11 +23,11 @@ func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApp // error is returned. This operation is only used by the ResultApprovals store, // which is only used within a Verification node, where it is assumed that there // is only one approval per chunk. -func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { - return insert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) +func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { + return insertW(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) } // LookupResultApproval finds a ResultApproval by result ID and chunk index. -func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(*badger.Txn) error { - return retrieve(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) +func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(storage.Reader) error { + return retrieveR(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) } diff --git a/storage/badger/operation/common.go b/storage/badger/operation/common.go index 1c293348231..a23a360337f 100644 --- a/storage/badger/operation/common.go +++ b/storage/badger/operation/common.go @@ -44,6 +44,22 @@ func batchWrite(key []byte, entity interface{}) func(writeBatch *badger.WriteBat } } +func insertW(key []byte, val interface{}) func(storage.Writer) error { + return func(w storage.Writer) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + + err = w.Set(key, value) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } +} + // insert will encode the given entity using msgpack and will insert the resulting // binary data in the badger DB under the provided key. It will error if the // key already exists. @@ -266,6 +282,30 @@ func retrieve(key []byte, entity interface{}) func(*badger.Txn) error { } } +// retrieve will retrieve the binary data under the given key from the badger DB +// and decode it into the given entity. The provided entity needs to be a +// pointer to an initialized entity of the correct type. +// Error returns: +// - storage.ErrNotFound if the key does not exist in the database +// - generic error in case of unexpected failure from the database layer, or failure +// to decode an existing database value +func retrieveR(key []byte, entity interface{}) func(storage.Reader) error { + return func(r storage.Reader) error { + val, closer, err := r.Get(key) + if err != nil { + return err + } + + defer closer.Close() + + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("could not decode entity: %w", err) + } + return nil + } +} + // exists returns true if a key exists in the database. // No errors are expected during normal operation. func exists(key []byte, keyExists *bool) func(*badger.Txn) error { diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go new file mode 100644 index 00000000000..6e6c8799117 --- /dev/null +++ b/storage/badger/operation/reader_batch_writer.go @@ -0,0 +1,130 @@ +package operation + +import ( + "errors" + "fmt" + "io" + "sync" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +type ReaderBatchWriter struct { + db *badger.DB + batch *badger.WriteBatch + + addingCallback sync.Mutex // protect callbacks + callbacks []func(error) +} + +var _ storage.BadgerReaderBatchWriter = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) ReaderWriter() (storage.Reader, storage.Writer) { + // reusing the same underlying object, but expose with different interfaces + return b, b +} + +func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch { + return b.batch +} + +func (b *ReaderBatchWriter) AddCallback(callback func(error)) { + b.addingCallback.Lock() + defer b.addingCallback.Unlock() + + b.callbacks = append(b.callbacks, callback) +} + +func (b *ReaderBatchWriter) Commit() error { + err := b.batch.Flush() + + b.notifyCallbacks(err) + + return err +} + +func (b *ReaderBatchWriter) notifyCallbacks(err error) { + b.addingCallback.Lock() + defer b.addingCallback.Unlock() + + for _, callback := range b.callbacks { + callback(err) + } +} + +func WithBadgerReaderBatchWriter(db *badger.DB, fn func(storage.BadgerReaderBatchWriter) error) error { + batch := NewBadgerReaderBatchWriter(db) + + err := fn(batch) + if err != nil { + // fn might use lock to ensure concurrent safety while reading and writing data + // and the lock is usually released by a callback. + // in other words, fn might hold a lock to be released by a callback, + // we need to notify the callback for the locks to be released before + // returning the error. + batch.notifyCallbacks(err) + return err + } + + return batch.Commit() +} + +func NewBadgerReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { + return &ReaderBatchWriter{ + db: db, + batch: db.NewWriteBatch(), + } +} + +// ToReader is a helper function to convert a BadgerReaderBatchWriter to a Reader +var ToReader = NewBadgerReaderBatchWriter + +var _ storage.Reader = (*ReaderBatchWriter)(nil) + +type noopCloser struct{} + +var _ io.Closer = (*noopCloser)(nil) + +func (noopCloser) Close() error { return nil } + +func (b *ReaderBatchWriter) Get(key []byte) ([]byte, io.Closer, error) { + tx := b.db.NewTransaction(false) + defer tx.Discard() + + item, err := tx.Get(key) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil, nil, storage.ErrNotFound + } + return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) + } + + var value []byte + err = item.Value(func(val []byte) error { + value = append([]byte{}, val...) + return nil + }) + if err != nil { + return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err) + } + + return value, new(noopCloser), nil +} + +var _ storage.Writer = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) Set(key, value []byte) error { + return b.batch.Set(key, value) +} + +func (b *ReaderBatchWriter) Delete(key []byte) error { + return b.batch.Delete(key) +} + +func (b *ReaderBatchWriter) DeleteRange(start, end []byte) error { + // TODO: implement + return fmt.Errorf("not implemented") +} diff --git a/storage/batch.go b/storage/batch.go index 3147fc5c0e7..d04207055f3 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -1,11 +1,19 @@ package storage -import "github.com/dgraph-io/badger/v2" +import ( + "io" + "github.com/dgraph-io/badger/v2" +) + +// deprecated +// use Writer instead type Transaction interface { Set(key, val []byte) error } +// deprecated +// use BadgerReaderBatchWriter instead // BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra // callbacks which fire after the batch is successfully flushed. type BatchStorage interface { @@ -20,3 +28,69 @@ type BatchStorage interface { // Flush will flush the write batch and update the cache. Flush() error } + +type Reader interface { + // Get gets the value for the given key. It returns ErrNotFound if the DB + // does not contain the key. + // + // The caller should not modify the contents of the returned slice, but it is + // safe to modify the contents of the argument after Get returns. The + // returned slice will remain valid until the returned Closer is closed. On + // success, the caller MUST call closer.Close() or a memory leak will occur. + Get(key []byte) (value []byte, closer io.Closer, err error) +} + +// Writer is an interface for batch writing to a storage backend. +type Writer interface { + // Set sets the value for the given key. It overwrites any previous value + // for that key; a DB is not a multi-map. + // + // It is safe to modify the contents of the arguments after Set returns. + Set(k, v []byte) error + + // Delete deletes the value for the given key. Deletes are blind all will + // succeed even if the given key does not exist. + // + // It is safe to modify the contents of the arguments after Delete returns. + Delete(key []byte) error + + // DeleteRange deletes all of the point keys (and values) in the range + // [start,end) (inclusive on start, exclusive on end). DeleteRange does NOT + // delete overlapping range keys (eg, keys set via RangeKeySet). + // + // It is safe to modify the contents of the arguments after DeleteRange + // returns. + DeleteRange(start, end []byte) error +} + +// BadgerReaderBatchWriter is an interface for badger-specific reader and writer. +type BadgerReaderBatchWriter interface { + // ReaderWriter returns the reader and writer for the storage backend. + // The reader is used to read data from the storage backend, and + // the writer is used to write data to the storage backend with an atomic batch + // update. + // Note: + // - There is no guarantee on the consistency of the data read, + // the data read may not reflect the latest data written. + // it is the responsibility of the caller to ensure the consistency. + // - The writer cannot be used concurrently for writing. + ReaderWriter() (Reader, Writer) + + // BadgerBatch returns the underlying batch object + // Useful for implementing badger-specific operations + BadgerWriteBatch() *badger.WriteBatch + + // AddCallback adds a callback to execute after the batch has been flush + // regardless the batch update is succeeded or failed. + // The error parameter is the error returned by the batch update. + AddCallback(func(error)) +} + +// OnlyBadgerWriter is an adapter to convert a function that takes a Writer +// to a function that takes a BadgerReaderBatchWriter. +func OnlyBadgerWriter(fn func(Writer) error) func(BadgerReaderBatchWriter) error { + return func(rw BadgerReaderBatchWriter) error { + _, w := rw.ReaderWriter() + return fn(w) + } +}