diff --git a/cmd/bootstrap/utils/md5.go b/cmd/bootstrap/utils/md5.go index e885ed891e2..4d4bbe21046 100644 --- a/cmd/bootstrap/utils/md5.go +++ b/cmd/bootstrap/utils/md5.go @@ -1,9 +1,9 @@ package utils // The google storage API only provides md5 and crc32 hence overriding the linter flag for md5 -// #nosec import ( - "crypto/md5" //nolint:gosec + // #nosec + "crypto/md5" "io" "os" ) diff --git a/cmd/verification_builder.go b/cmd/verification_builder.go index 70c84617f02..3c0cac6a1c9 100644 --- a/cmd/verification_builder.go +++ b/cmd/verification_builder.go @@ -38,6 +38,8 @@ import ( badgerState "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/state/protocol/blocktimer" "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" ) type VerificationConfig struct { @@ -201,7 +203,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { vmCtx := fvm.NewContext(fvmOptions...) chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Logger) - approvalStorage := badger.NewResultApprovals(node.Metrics.Cache, node.DB) + approvalStorage := store.NewResultApprovals(node.Metrics.Cache, badgerimpl.ToDB(node.DB)) verifierEng, err = verifier.New( node.Logger, collector, diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index ee3b3b10ee7..5f91ceb7008 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -105,7 +105,9 @@ import ( "github.com/onflow/flow-go/state/protocol/events/gadgets" "github.com/onflow/flow-go/state/protocol/util" storage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" storagepebble "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) @@ -1021,7 +1023,7 @@ func VerificationNode(t testing.TB, chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Log) - approvalStorage := storage.NewResultApprovals(node.Metrics, node.PublicDB) + approvalStorage := store.NewResultApprovals(node.Metrics, badgerimpl.ToDB(node.PublicDB)) node.VerifierEngine, err = verifier.New(node.Log, collector, diff --git a/storage/badger/operation/approvals.go b/storage/badger/operation/approvals.go deleted file mode 100644 index 8a994eed2a2..00000000000 --- a/storage/badger/operation/approvals.go +++ /dev/null @@ -1,31 +0,0 @@ -package operation - -import ( - "github.com/dgraph-io/badger/v2" - - "github.com/onflow/flow-go/model/flow" -) - -// InsertResultApproval inserts a ResultApproval by ID. -func InsertResultApproval(approval *flow.ResultApproval) func(*badger.Txn) error { - return insert(makePrefix(codeResultApproval, approval.ID()), approval) -} - -// RetrieveResultApproval retrieves an approval by ID. -func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(*badger.Txn) error { - return retrieve(makePrefix(codeResultApproval, approvalID), approval) -} - -// IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID -// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists -// error is returned. This operation is only used by the ResultApprovals store, -// which is only used within a Verification node, where it is assumed that there -// is only one approval per chunk. -func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { - return insert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) -} - -// LookupResultApproval finds a ResultApproval by result ID and chunk index. -func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(*badger.Txn) error { - return retrieve(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) -} diff --git a/storage/batch.go b/storage/batch.go index 3147fc5c0e7..23b9d39ac63 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -1,11 +1,17 @@ package storage -import "github.com/dgraph-io/badger/v2" +import ( + "github.com/dgraph-io/badger/v2" +) +// deprecated +// use Writer instead type Transaction interface { Set(key, val []byte) error } +// deprecated +// use ReaderBatchWriter instead // BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra // callbacks which fire after the batch is successfully flushed. type BatchStorage interface { diff --git a/storage/operation/approvals.go b/storage/operation/approvals.go new file mode 100644 index 00000000000..39d88c1f682 --- /dev/null +++ b/storage/operation/approvals.go @@ -0,0 +1,34 @@ +package operation + +import ( + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" +) + +// InsertResultApproval inserts a ResultApproval by ID. +// The same key (`approval.ID()`) necessitates that the value (full `approval`) is +// also identical (otherwise, we would have a successful pre-image attack on our +// cryptographic hash function). Therefore, concurrent calls to this function are safe. +func InsertResultApproval(approval *flow.ResultApproval) func(storage.Writer) error { + return Upsert(makePrefix(codeResultApproval, approval.ID()), approval) +} + +// RetrieveResultApproval retrieves an approval by ID. +func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(storage.Reader) error { + return Retrieve(makePrefix(codeResultApproval, approvalID), approval) +} + +// UnsafeIndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID +// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists +// error is returned. This operation is only used by the ResultApprovals store, +// which is only used within a Verification node, where it is assumed that there +// is only one approval per chunk. +// CAUTION: Use of this function must be synchronized by storage.ResultApprovals. +func UnsafeIndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { + return Upsert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) +} + +// LookupResultApproval finds a ResultApproval by result ID and chunk index. +func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(storage.Reader) error { + return Retrieve(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) +} diff --git a/storage/operation/approvals_test.go b/storage/operation/approvals_test.go new file mode 100644 index 00000000000..38ff89d15db --- /dev/null +++ b/storage/operation/approvals_test.go @@ -0,0 +1,44 @@ +package operation_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" + "github.com/onflow/flow-go/utils/unittest" +) + +func BenchmarkRetrieveApprovals(b *testing.B) { + dbtest.BenchWithDB(b, func(b *testing.B, db storage.DB) { + b.Run("RetrieveApprovals", func(b *testing.B) { + approval := unittest.ResultApprovalFixture() + require.NoError(b, db.WithReaderBatchWriter(storage.OnlyWriter(operation.InsertResultApproval(approval)))) + + b.ResetTimer() + + approvalID := approval.ID() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var stored flow.ResultApproval + require.NoError(b, operation.RetrieveResultApproval(approvalID, &stored)(db.Reader())) + } + }) + + }) + }) +} + +func BenchmarkInsertApproval(b *testing.B) { + dbtest.BenchWithDB(b, func(b *testing.B, db storage.DB) { + b.Run("InsertApprovals", func(b *testing.B) { + for i := 0; i < b.N; i++ { + approval := unittest.ResultApprovalFixture() + require.NoError(b, db.WithReaderBatchWriter(storage.OnlyWriter(operation.InsertResultApproval(approval)))) + } + }) + }) +} diff --git a/storage/operation/badgerimpl/dbstore.go b/storage/operation/badgerimpl/dbstore.go new file mode 100644 index 00000000000..b8460165e32 --- /dev/null +++ b/storage/operation/badgerimpl/dbstore.go @@ -0,0 +1,23 @@ +package badgerimpl + +import ( + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" +) + +func ToDB(db *badger.DB) storage.DB { + return &dbStore{db: db} +} + +type dbStore struct { + db *badger.DB +} + +func (b *dbStore) Reader() storage.Reader { + return dbReader{db: b.db} +} + +func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error { + return WithReaderBatchWriter(b.db, fn) +} diff --git a/storage/operation/badgerimpl/iterator.go b/storage/operation/badgerimpl/iterator.go new file mode 100644 index 00000000000..81ecda2d719 --- /dev/null +++ b/storage/operation/badgerimpl/iterator.go @@ -0,0 +1,65 @@ +package badgerimpl + +import ( + "bytes" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" +) + +type badgerIterator struct { + iter *badger.Iterator + lowerBound []byte + upperBound []byte +} + +var _ storage.Iterator = (*badgerIterator)(nil) + +func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage.IteratorOption) *badgerIterator { + options := badger.DefaultIteratorOptions + if ops.IterateKeyOnly { + options.PrefetchValues = false + } + + tx := db.NewTransaction(false) + iter := tx.NewIterator(options) + + lowerBound, upperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix) + + return &badgerIterator{ + iter: iter, + lowerBound: lowerBound, + upperBound: upperBound, + } +} + +func (i *badgerIterator) SeekGE() { + i.iter.Seek(i.lowerBound) +} + +func (i *badgerIterator) Valid() bool { + // if it's beyond the upper bound, it's invalid + if !i.iter.Valid() { + return false + } + key := i.iter.Item().Key() + // "< 0" means the upperBound is exclusive + valid := bytes.Compare(key, i.upperBound) < 0 + return valid +} + +func (i *badgerIterator) Next() { + i.iter.Next() +} + +func (i *badgerIterator) IterItem() storage.IterItem { + return i.iter.Item() +} + +var _ storage.IterItem = (*badger.Item)(nil) + +func (i *badgerIterator) Close() error { + i.iter.Close() + return nil +} diff --git a/storage/operation/badgerimpl/reader.go b/storage/operation/badgerimpl/reader.go new file mode 100644 index 00000000000..06158e634ff --- /dev/null +++ b/storage/operation/badgerimpl/reader.go @@ -0,0 +1,54 @@ +package badgerimpl + +import ( + "errors" + "io" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +type dbReader struct { + db *badger.DB +} + +type noopCloser struct{} + +var _ io.Closer = (*noopCloser)(nil) + +func (noopCloser) Close() error { return nil } + +func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { + tx := b.db.NewTransaction(false) + defer tx.Discard() + + item, err := tx.Get(key) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil, nil, storage.ErrNotFound + } + return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) + } + + var value []byte + err = item.Value(func(val []byte) error { + value = append([]byte{}, val...) + return nil + }) + if err != nil { + return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err) + } + + return value, noopCloser{}, nil +} + +func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { + return newBadgerIterator(b.db, startPrefix, endPrefix, ops), nil +} + +// ToReader is a helper function to convert a *badger.DB to a Reader +func ToReader(db *badger.DB) storage.Reader { + return dbReader{db} +} diff --git a/storage/operation/badgerimpl/writer.go b/storage/operation/badgerimpl/writer.go new file mode 100644 index 00000000000..3837be3917f --- /dev/null +++ b/storage/operation/badgerimpl/writer.go @@ -0,0 +1,93 @@ +package badgerimpl + +import ( + "fmt" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + op "github.com/onflow/flow-go/storage/operation" +) + +type ReaderBatchWriter struct { + globalReader storage.Reader + batch *badger.WriteBatch + + callbacks op.Callbacks +} + +var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) GlobalReader() storage.Reader { + return b.globalReader +} + +func (b *ReaderBatchWriter) Writer() storage.Writer { + return b +} + +func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch { + return b.batch +} + +func (b *ReaderBatchWriter) AddCallback(callback func(error)) { + b.callbacks.AddCallback(callback) +} + +func (b *ReaderBatchWriter) Commit() error { + err := b.batch.Flush() + + b.callbacks.NotifyCallbacks(err) + + return err +} + +func WithReaderBatchWriter(db *badger.DB, fn func(storage.ReaderBatchWriter) error) error { + batch := NewReaderBatchWriter(db) + + err := fn(batch) + if err != nil { + // fn might use lock to ensure concurrent safety while reading and writing data + // and the lock is usually released by a callback. + // in other words, fn might hold a lock to be released by a callback, + // we need to notify the callback for the locks to be released before + // returning the error. + batch.callbacks.NotifyCallbacks(err) + return err + } + + return batch.Commit() +} + +func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { + return &ReaderBatchWriter{ + globalReader: ToReader(db), + batch: db.NewWriteBatch(), + } +} + +var _ storage.Writer = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) Set(key, value []byte) error { + return b.batch.Set(key, value) +} + +func (b *ReaderBatchWriter) Delete(key []byte) error { + return b.batch.Delete(key) +} + +func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error { + err := operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error { + err := b.batch.Delete(key) + if err != nil { + return fmt.Errorf("could not add key to delete batch (%v): %w", key, err) + } + return nil + })(globalReader) + + if err != nil { + return fmt.Errorf("could not find keys by range to be deleted: %w", err) + } + return nil +} diff --git a/storage/operation/callbacks.go b/storage/operation/callbacks.go new file mode 100644 index 00000000000..40d414ded91 --- /dev/null +++ b/storage/operation/callbacks.go @@ -0,0 +1,24 @@ +package operation + +import "sync" + +type Callbacks struct { + sync.Mutex // protect callbacks + callbacks []func(error) +} + +func (b *Callbacks) AddCallback(callback func(error)) { + b.Lock() + defer b.Unlock() + + b.callbacks = append(b.callbacks, callback) +} + +func (b *Callbacks) NotifyCallbacks(err error) { + b.Lock() + defer b.Unlock() + + for _, callback := range b.callbacks { + callback(err) + } +} diff --git a/storage/operation/codec.go b/storage/operation/codec.go new file mode 100644 index 00000000000..43dc4c37f7a --- /dev/null +++ b/storage/operation/codec.go @@ -0,0 +1,34 @@ +package operation + +import ( + "encoding/binary" + "fmt" + + "github.com/onflow/flow-go/model/flow" +) + +// EncodeKeyPart encodes a value to be used as a part of a key to be stored in storage. +func EncodeKeyPart(v interface{}) []byte { + switch i := v.(type) { + case uint8: + return []byte{i} + case uint32: + b := make([]byte, 4) + binary.BigEndian.PutUint32(b, i) + return b + case uint64: + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, i) + return b + case string: + return []byte(i) + case flow.Role: + return []byte{byte(i)} + case flow.Identifier: + return i[:] + case flow.ChainID: + return []byte(i) + default: + panic(fmt.Sprintf("unsupported type to convert (%T)", v)) + } +} diff --git a/storage/operation/dbtest/helper.go b/storage/operation/dbtest/helper.go new file mode 100644 index 00000000000..d450b709377 --- /dev/null +++ b/storage/operation/dbtest/helper.go @@ -0,0 +1,114 @@ +package dbtest + +import ( + "testing" + + "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" + "github.com/onflow/flow-go/utils/unittest" +) + +// helper types and functions +type WithWriter func(func(storage.Writer) error) error + +func RunWithStorages(t *testing.T, fn func(*testing.T, storage.Reader, WithWriter)) { + t.Run("BadgerStorage", func(t *testing.T) { + unittest.RunWithBadgerDB(t, runWithBadger(func(r storage.Reader, wr WithWriter) { + fn(t, r, wr) + })) + }) + + t.Run("PebbleStorage", func(t *testing.T) { + unittest.RunWithPebbleDB(t, runWithPebble(func(r storage.Reader, wr WithWriter) { + fn(t, r, wr) + })) + }) +} + +func BenchWithStorages(t *testing.B, fn func(*testing.B, storage.Reader, WithWriter)) { + t.Run("BadgerStorage", func(t *testing.B) { + unittest.RunWithBadgerDB(t, runWithBadger(func(r storage.Reader, wr WithWriter) { + fn(t, r, wr) + })) + }) + + t.Run("PebbleStorage", func(t *testing.B) { + unittest.RunWithPebbleDB(t, runWithPebble(func(r storage.Reader, wr WithWriter) { + fn(t, r, wr) + })) + }) +} + +func runWithBadger(fn func(storage.Reader, WithWriter)) func(*badger.DB) { + return func(db *badger.DB) { + withWriter := func(writing func(storage.Writer) error) error { + writer := badgerimpl.NewReaderBatchWriter(db) + err := writing(writer) + if err != nil { + return err + } + + err = writer.Commit() + if err != nil { + return err + } + return nil + } + + reader := badgerimpl.ToReader(db) + fn(reader, withWriter) + } +} + +func runWithPebble(fn func(storage.Reader, WithWriter)) func(*pebble.DB) { + return func(db *pebble.DB) { + withWriter := func(writing func(storage.Writer) error) error { + writer := pebbleimpl.NewReaderBatchWriter(db) + err := writing(writer) + if err != nil { + return err + } + + err = writer.Commit() + if err != nil { + return err + } + return nil + } + + reader := pebbleimpl.ToReader(db) + fn(reader, withWriter) + } +} + +func RunWithDB(t *testing.T, fn func(t *testing.T, store storage.DB)) { + t.Run("BadgerStorage", func(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + fn(t, badgerimpl.ToDB(db)) + }) + }) + + t.Run("PebbleStorage", func(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + fn(t, pebbleimpl.ToDB(db)) + }) + }) +} + +func BenchWithDB(b *testing.B, fn func(*testing.B, storage.DB)) { + b.Run("BadgerStorage", func(b *testing.B) { + unittest.RunWithBadgerDB(b, func(db *badger.DB) { + fn(b, badgerimpl.ToDB(db)) + }) + }) + + b.Run("PebbleStorage", func(b *testing.B) { + unittest.RunWithPebbleDB(b, func(db *pebble.DB) { + fn(b, pebbleimpl.ToDB(db)) + }) + }) +} diff --git a/storage/operation/pebbleimpl/dbstore.go b/storage/operation/pebbleimpl/dbstore.go new file mode 100644 index 00000000000..fcc5b14a06a --- /dev/null +++ b/storage/operation/pebbleimpl/dbstore.go @@ -0,0 +1,23 @@ +package pebbleimpl + +import ( + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/storage" +) + +func ToDB(db *pebble.DB) storage.DB { + return &dbStore{db: db} +} + +type dbStore struct { + db *pebble.DB +} + +func (b *dbStore) Reader() storage.Reader { + return dbReader{db: b.db} +} + +func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error { + return WithReaderBatchWriter(b.db, fn) +} diff --git a/storage/operation/pebbleimpl/iterator.go b/storage/operation/pebbleimpl/iterator.go new file mode 100644 index 00000000000..b6f3910cead --- /dev/null +++ b/storage/operation/pebbleimpl/iterator.go @@ -0,0 +1,74 @@ +package pebbleimpl + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/storage" +) + +type pebbleIterator struct { + iter *pebble.Iterator + lowerBound []byte +} + +var _ storage.Iterator = (*pebbleIterator)(nil) + +func newPebbleIterator(reader pebble.Reader, startPrefix, endPrefix []byte, ops storage.IteratorOption) (*pebbleIterator, error) { + lowerBound, upperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix) + + options := pebble.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + } + + iter, err := reader.NewIter(&options) + if err != nil { + return nil, fmt.Errorf("can not create iterator: %w", err) + } + + return &pebbleIterator{ + iter: iter, + lowerBound: lowerBound, + }, nil +} + +func (i *pebbleIterator) SeekGE() { + i.iter.SeekGE(i.lowerBound) +} + +func (i *pebbleIterator) Valid() bool { + return i.iter.Valid() +} + +func (i *pebbleIterator) Next() { + i.iter.Next() +} + +func (i *pebbleIterator) IterItem() storage.IterItem { + return pebbleIterItem{iter: i.iter} +} + +type pebbleIterItem struct { + iter *pebble.Iterator +} + +var _ storage.IterItem = (*pebbleIterItem)(nil) + +func (i pebbleIterItem) Key() []byte { + return i.iter.Key() +} + +func (i pebbleIterItem) Value(fn func([]byte) error) error { + val, err := i.iter.ValueAndErr() + if err != nil { + return err + } + + return fn(val) +} + +func (i *pebbleIterator) Close() error { + return i.iter.Close() +} diff --git a/storage/operation/pebbleimpl/reader.go b/storage/operation/pebbleimpl/reader.go new file mode 100644 index 00000000000..6cfdfd93da5 --- /dev/null +++ b/storage/operation/pebbleimpl/reader.go @@ -0,0 +1,47 @@ +package pebbleimpl + +import ( + "errors" + "io" + + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +type dbReader struct { + db *pebble.DB +} + +var _ storage.Reader = (*dbReader)(nil) + +type noopCloser struct{} + +var _ io.Closer = (*noopCloser)(nil) + +func (noopCloser) Close() error { return nil } + +func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { + value, closer, err := b.db.Get(key) + + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, nil, storage.ErrNotFound + } + + // exception while checking for the key + return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) + } + + return value, closer, nil +} + +func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { + return newPebbleIterator(b.db, startPrefix, endPrefix, ops) +} + +// ToReader is a helper function to convert a *pebble.DB to a Reader +func ToReader(db *pebble.DB) storage.Reader { + return dbReader{db} +} diff --git a/storage/operation/pebbleimpl/writer.go b/storage/operation/pebbleimpl/writer.go new file mode 100644 index 00000000000..ad639223209 --- /dev/null +++ b/storage/operation/pebbleimpl/writer.go @@ -0,0 +1,83 @@ +package pebbleimpl + +import ( + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/storage" + op "github.com/onflow/flow-go/storage/operation" +) + +type ReaderBatchWriter struct { + globalReader storage.Reader + batch *pebble.Batch + + callbacks op.Callbacks +} + +var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) GlobalReader() storage.Reader { + return b.globalReader +} + +func (b *ReaderBatchWriter) Writer() storage.Writer { + return b +} + +func (b *ReaderBatchWriter) PebbleWriterBatch() *pebble.Batch { + return b.batch +} + +func (b *ReaderBatchWriter) AddCallback(callback func(error)) { + b.callbacks.AddCallback(callback) +} + +func (b *ReaderBatchWriter) Commit() error { + err := b.batch.Commit(pebble.Sync) + + b.callbacks.NotifyCallbacks(err) + + return err +} + +func WithReaderBatchWriter(db *pebble.DB, fn func(storage.ReaderBatchWriter) error) error { + batch := NewReaderBatchWriter(db) + + err := fn(batch) + if err != nil { + // fn might use lock to ensure concurrent safety while reading and writing data + // and the lock is usually released by a callback. + // in other words, fn might hold a lock to be released by a callback, + // we need to notify the callback for the locks to be released before + // returning the error. + batch.callbacks.NotifyCallbacks(err) + return err + } + + return batch.Commit() +} + +func NewReaderBatchWriter(db *pebble.DB) *ReaderBatchWriter { + return &ReaderBatchWriter{ + globalReader: ToReader(db), + batch: db.NewBatch(), + } +} + +var _ storage.Writer = (*ReaderBatchWriter)(nil) + +func (b *ReaderBatchWriter) Set(key, value []byte) error { + return b.batch.Set(key, value, pebble.Sync) +} + +func (b *ReaderBatchWriter) Delete(key []byte) error { + return b.batch.Delete(key, pebble.Sync) +} + +// DeleteByRange deletes all keys with the given prefix defined by [startPrefix, endPrefix] (both inclusive). +func (b *ReaderBatchWriter) DeleteByRange(_ storage.Reader, startPrefix, endPrefix []byte) error { + // DeleteRange takes the prefix range with start (inclusive) and end (exclusive, note: not inclusive). + // therefore, we need to increment the endPrefix to make it inclusive. + start, end := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix) + return b.batch.DeleteRange(start, end, pebble.Sync) +} diff --git a/storage/operation/prefix.go b/storage/operation/prefix.go new file mode 100644 index 00000000000..feacac40c6a --- /dev/null +++ b/storage/operation/prefix.go @@ -0,0 +1,163 @@ +package operation + +const ( + + // codes for special database markers + // codeMax = 1 // keeps track of the maximum key size + // nolint:unused + codeDBType = 2 // specifies a database type + + // codes for views with special meaning + // nolint:unused + codeSafetyData = 10 // safety data for hotstuff state + // nolint:unused + codeLivenessData = 11 // liveness data for hotstuff state + + // codes for fields associated with the root state + // nolint:unused + codeSporkID = 13 + // nolint:unused + codeProtocolVersion = 14 + // nolint:unused + codeEpochCommitSafetyThreshold = 15 + // nolint:unused + codeSporkRootBlockHeight = 16 + + // code for heights with special meaning + // nolint:unused + codeFinalizedHeight = 20 // latest finalized block height + // nolint:unused + codeSealedHeight = 21 // latest sealed block height + // nolint:unused + codeClusterHeight = 22 // latest finalized height on cluster + // nolint:unused + codeExecutedBlock = 23 // latest executed block with max height + // nolint:unused + codeFinalizedRootHeight = 24 // the height of the highest finalized block contained in the root snapshot + // nolint:unused + codeLastCompleteBlockHeight = 25 // the height of the last block for which all collections were received + // nolint:unused + codeEpochFirstHeight = 26 // the height of the first block in a given epoch + // nolint:unused + codeSealedRootHeight = 27 // the height of the highest sealed block contained in the root snapshot + + // codes for single entity storage + // nolint:unused + codeHeader = 30 + _ = 31 // DEPRECATED: 31 was used for identities before epochs + codeGuarantee = 32 + codeSeal = 33 + codeTransaction = 34 + codeCollection = 35 + codeExecutionResult = 36 + codeResultApproval = 37 + codeChunk = 38 + codeExecutionReceiptMeta = 39 // NOTE: prior to Mainnet25, this erroneously had the same value as codeExecutionResult (36) + + // codes for indexing single identifier by identifier/integer + // nolint:unused + codeHeightToBlock = 40 // index mapping height to block ID + // nolint:unused + codeBlockIDToLatestSealID = 41 // index mapping a block its last payload seal + // nolint:unused + codeClusterBlockToRefBlock = 42 // index cluster block ID to reference block ID + // nolint:unused + codeRefHeightToClusterBlock = 43 // index reference block height to cluster block IDs + // nolint:unused + codeBlockIDToFinalizedSeal = 44 // index _finalized_ seal by sealed block ID + // nolint:unused + codeBlockIDToQuorumCertificate = 45 // index of quorum certificates by block ID + // nolint:unused + codeEpochProtocolStateByBlockID = 46 // index of epoch protocol state entry by block ID + // nolint:unused + codeProtocolKVStoreByBlockID = 47 // index of protocol KV store entry by block ID + + // codes for indexing multiple identifiers by identifier + // nolint:unused + codeBlockChildren = 50 // index mapping block ID to children blocks + _ = 51 // DEPRECATED: 51 was used for identity indexes before epochs + // nolint:unused + codePayloadGuarantees = 52 // index mapping block ID to payload guarantees + // nolint:unused + codePayloadSeals = 53 // index mapping block ID to payload seals + // nolint:unused + codeCollectionBlock = 54 // index mapping collection ID to block ID + // nolint:unused + codeOwnBlockReceipt = 55 // index mapping block ID to execution receipt ID for execution nodes + _ = 56 // DEPRECATED: 56 was used for block->epoch status prior to Dynamic Protocol State in Mainnet25 + // nolint:unused + codePayloadReceipts = 57 // index mapping block ID to payload receipts + // nolint:unused + codePayloadResults = 58 // index mapping block ID to payload results + // nolint:unused + codeAllBlockReceipts = 59 // index mapping of blockID to multiple receipts + // nolint:unused + codePayloadProtocolStateID = 60 // index mapping block ID to payload protocol state ID + + // codes related to protocol level information + // nolint:unused + codeEpochSetup = 61 // EpochSetup service event, keyed by ID + // nolint:unused + codeEpochCommit = 62 // EpochCommit service event, keyed by ID + // nolint:unused + codeBeaconPrivateKey = 63 // BeaconPrivateKey, keyed by epoch counter + // nolint:unused + codeDKGStarted = 64 // flag that the DKG for an epoch has been started + // nolint:unused + codeDKGEnded = 65 // flag that the DKG for an epoch has ended (stores end state) + // nolint:unused + codeVersionBeacon = 67 // flag for storing version beacons + // nolint:unused + codeEpochProtocolState = 68 + // nolint:unused + codeProtocolKVStore = 69 + + // code for ComputationResult upload status storage + // NOTE: for now only GCP uploader is supported. When other uploader (AWS e.g.) needs to + // be supported, we will need to define new code. + // nolint:unused + codeComputationResults = 66 + + // job queue consumers and producers + // nolint:unused + codeJobConsumerProcessed = 70 + // nolint:unused + codeJobQueue = 71 + // nolint:unused + codeJobQueuePointer = 72 + + // legacy codes (should be cleaned up) + codeChunkDataPack = 100 + codeCommit = 101 + codeEvent = 102 + codeExecutionStateInteractions = 103 + codeTransactionResult = 104 + codeFinalizedCluster = 105 + codeServiceEvent = 106 + codeTransactionResultIndex = 107 + codeLightTransactionResult = 108 + codeLightTransactionResultIndex = 109 + codeIndexCollection = 200 + codeIndexExecutionResultByBlock = 202 + codeIndexCollectionByTransaction = 203 + codeIndexResultApprovalByChunk = 204 + + // TEMPORARY codes + // nolint:unused + blockedNodeIDs = 205 // manual override for adding node IDs to list of ejected nodes, applies to networking layer only + + // internal failure information that should be preserved across restarts + // nolint:unused + codeExecutionFork = 254 + // nolint:unused + codeEpochEmergencyFallbackTriggered = 255 +) + +func makePrefix(code byte, keys ...interface{}) []byte { + prefix := make([]byte, 1) + prefix[0] = code + for _, key := range keys { + prefix = append(prefix, EncodeKeyPart(key)...) + } + return prefix +} diff --git a/storage/operation/reads.go b/storage/operation/reads.go new file mode 100644 index 00000000000..8a41851c0c6 --- /dev/null +++ b/storage/operation/reads.go @@ -0,0 +1,243 @@ +package operation + +import ( + "bytes" + "errors" + "fmt" + + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +// CheckFunc is a function that checks if the value should be read and decoded. +// return (true, nil) to read the value and pass it to the CreateFunc and HandleFunc for decoding +// return (false, nil) to skip reading the value +// return (false, err) if running into any exception, the iteration should be stopped. +// Note: the returned bool is to decide whether to read the value or not, rather than whether to stop +// the iteration or not. +type CheckFunc func(key []byte) (bool, error) + +// createFunc returns a pointer to an initialized entity that we can potentially +// decode the next value into during a badger DB iteration. +type CreateFunc func() interface{} + +// handleFunc is a function that starts the processing of the current key-value +// pair during a badger iteration. It should be called after the key was checked +// and the entity was decoded. +// No errors are expected during normal operation. Any errors will halt the iteration. +type HandleFunc func() error +type IterationFunc func() (CheckFunc, CreateFunc, HandleFunc) + +// IterateKeysInPrefixRange will iterate over all keys in the given range [startPrefix, endPrefix] (both inclusive) +func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(key []byte) error) func(storage.Reader) error { + return Iterate(startPrefix, endPrefix, func() (CheckFunc, CreateFunc, HandleFunc) { + return func(key []byte) (bool, error) { + err := check(key) + if err != nil { + return false, err + } + return false, nil + }, nil, nil + }, storage.IteratorOption{IterateKeyOnly: true}) +} + +// Iterate will iterate over all keys in the given range [startPrefix, endPrefix] (both inclusive) +func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error { + return func(r storage.Reader) error { + + if len(startPrefix) == 0 { + return fmt.Errorf("startPrefix prefix is empty") + } + + if len(endPrefix) == 0 { + return fmt.Errorf("endPrefix prefix is empty") + } + + // Reverse iteration is not supported by pebble + if bytes.Compare(startPrefix, endPrefix) > 0 { + return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key") + } + + it, err := r.NewIter(startPrefix, endPrefix, opt) + if err != nil { + return fmt.Errorf("can not create iterator: %w", err) + } + defer it.Close() + + for it.SeekGE(); it.Valid(); it.Next() { + item := it.IterItem() + key := item.Key() + + // initialize processing functions for iteration + check, create, handle := iterFunc() + + keyCopy := make([]byte, len(key)) + copy(keyCopy, key) + + // check if we should process the item at all + shouldReadValue, err := check(keyCopy) + if err != nil { + return err + } + if !shouldReadValue { // skip reading value + continue + } + + err = item.Value(func(val []byte) error { + + // decode into the entity + entity := create() + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("could not decode entity: %w", err) + } + + // process the entity + err = handle() + if err != nil { + return fmt.Errorf("could not handle entity: %w", err) + } + + return nil + }) + + if err != nil { + return fmt.Errorf("could not process value: %w", err) + } + } + + return nil + } +} + +// Traverse will iterate over all keys with the given prefix +func Traverse(prefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error { + return Iterate(prefix, prefix, iterFunc, opt) +} + +// KeyOnlyIterateFunc returns an IterationFunc that only iterates over keys +func KeyOnlyIterateFunc(fn func(key []byte) error) IterationFunc { + return func() (CheckFunc, CreateFunc, HandleFunc) { + checker := func(key []byte) (bool, error) { + return false, fn(key) + } + + create := func() interface{} { + return nil + } + + handle := func() error { + return nil + } + + return checker, create, handle + } +} + +// PrefixUpperBound returns a key K such that all possible keys beginning with the input prefix +// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble. +// This is used to define an upper bound for iteration, when we want to iterate over +// all keys beginning with a given prefix. +// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration +func PrefixUpperBound(prefix []byte) []byte { + end := make([]byte, len(prefix)) + copy(end, prefix) + for i := len(end) - 1; i >= 0; i-- { + // increment the bytes by 1 + end[i] = end[i] + 1 + if end[i] != 0 { + return end[:i+1] + } + } + return nil // no upper-bound +} + +// Exists returns true if a key exists in the database. +// No errors are expected during normal operation. +func Exists(key []byte, keyExists *bool) func(storage.Reader) error { + return func(r storage.Reader) error { + _, closer, err := r.Get(key) + if err != nil { + // the key does not exist in the database + if errors.Is(err, storage.ErrNotFound) { + *keyExists = false + return nil + } + // exception while checking for the key + return irrecoverable.NewExceptionf("could not load data: %w", err) + } + defer closer.Close() + + // the key does exist in the database + *keyExists = true + return nil + } +} + +// retrieve will retrieve the binary data under the given key from the badger DB +// and decode it into the given entity. The provided entity needs to be a +// pointer to an initialized entity of the correct type. +// Error returns: +// - storage.ErrNotFound if the key does not exist in the database +// - generic error in case of unexpected failure from the database layer, or failure +// to decode an existing database value +func Retrieve(key []byte, entity interface{}) func(storage.Reader) error { + return func(r storage.Reader) error { + val, closer, err := r.Get(key) + if err != nil { + return err + } + + defer closer.Close() + + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("could not decode entity: %w", err) + } + return nil + } +} + +// FindHighestAtOrBelow finds the highest key with the given prefix and +// height equal to or below the given height. +func FindHighestAtOrBelow(prefix []byte, height uint64, entity interface{}) func(storage.Reader) error { + return func(r storage.Reader) error { + if len(prefix) == 0 { + return fmt.Errorf("prefix must not be empty") + } + + key := append(prefix, EncodeKeyPart(height)...) + it, err := r.NewIter(prefix, key, storage.DefaultIteratorOptions()) + if err != nil { + return fmt.Errorf("can not create iterator: %w", err) + } + defer it.Close() + + var highestKey []byte + // find highest value below the given height + for it.SeekGE(); it.Valid(); it.Next() { + highestKey = it.IterItem().Key() + } + + if len(highestKey) == 0 { + return storage.ErrNotFound + } + + // read the value of the highest key + val, closer, err := r.Get(highestKey) + if err != nil { + return err + } + + defer closer.Close() + + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("failed to decode value: %w", err) + } + + return nil + } +} diff --git a/storage/operation/reads_bench_test.go b/storage/operation/reads_bench_test.go new file mode 100644 index 00000000000..7b490254e73 --- /dev/null +++ b/storage/operation/reads_bench_test.go @@ -0,0 +1,63 @@ +package operation_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" +) + +func BenchmarkRetrieve(t *testing.B) { + dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) { + e := Entity{ID: 1337} + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + + t.ResetTimer() + + for i := 0; i < t.N; i++ { + var readBack Entity + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + } + }) +} + +func BenchmarkNonExist(t *testing.B) { + dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) { + for i := 0; i < t.N; i++ { + e := Entity{ID: uint64(i)} + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + } + + t.ResetTimer() + nonExist := Entity{ID: uint64(t.N + 1)} + for i := 0; i < t.N; i++ { + var exists bool + require.NoError(t, operation.Exists(nonExist.Key(), &exists)(r)) + } + }) +} + +func BenchmarkIterate(t *testing.B) { + dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) { + prefix1 := []byte("prefix-1") + prefix2 := []byte("prefix-2") + for i := 0; i < t.N; i++ { + e := Entity{ID: uint64(i)} + key1 := append(prefix1, e.Key()...) + key2 := append(prefix2, e.Key()...) + + require.NoError(t, withWriter(operation.Upsert(key1, e))) + require.NoError(t, withWriter(operation.Upsert(key2, e))) + } + + t.ResetTimer() + var found [][]byte + require.NoError(t, operation.IterateKeysInPrefixRange(prefix1, prefix2, func(key []byte) error { + found = append(found, key) + return nil + })(r), "should iterate forward without error") + }) +} diff --git a/storage/operation/reads_test.go b/storage/operation/reads_test.go new file mode 100644 index 00000000000..17b56b31255 --- /dev/null +++ b/storage/operation/reads_test.go @@ -0,0 +1,319 @@ +package operation_test + +import ( + "bytes" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" +) + +func TestIterateKeysInPrefixRange(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + // Define the prefix range + prefixStart := []byte{0x10} + prefixEnd := []byte{0x20} + + // Create a range of keys around the prefix start/end values + keys := [][]byte{ + // before start -> not included in range + {0x09, 0xff}, + // within the start prefix -> included in range + {0x10, 0x00}, + {0x10, 0xff}, + // between start and end -> included in range + {0x15, 0x00}, + {0x1A, 0xff}, + // within the end prefix -> included in range + {0x20, 0x00}, + {0x20, 0xff}, + // after end -> not included in range + {0x21, 0x00}, + } + + // Keys expected to be in the prefix range + lastNToExclude := 1 + keysInRange := keys[1 : len(keys)-lastNToExclude] // these keys are between the start and end + + // Insert the keys into the storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, key := range keys { + value := []byte{0x00} // value are skipped, doesn't matter + err := operation.Upsert(key, value)(writer) + if err != nil { + return err + } + } + return nil + })) + + // Forward iteration and check boundaries + var found [][]byte + require.NoError(t, operation.IterateKeysInPrefixRange(prefixStart, prefixEnd, func(key []byte) error { + found = append(found, key) + return nil + })(r), "should iterate forward without error") + require.ElementsMatch(t, keysInRange, found, "forward iteration should return the correct keys in range") + }) +} + +// Verify that when keys are prefixed by two prefixes,we can iterate with either first prefix or second prefix. +func TestIterateHierachicalPrefixes(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + keys := [][]byte{ + {0x09, 0x00, 0x00}, + {0x09, 0x00, 0xff}, + {0x09, 0x19, 0xff}, + {0x09, 0xff, 0x00}, + {0x09, 0xff, 0xff}, + {0x10, 0x00, 0x00}, + {0x10, 0x00, 0xff}, + {0x10, 0x19, 0x00}, + {0x10, 0x19, 0xff}, + {0x10, 0x20, 0x00}, + {0x10, 0x20, 0xff}, + {0x10, 0x21, 0x00}, + {0x10, 0x21, 0xff}, + {0x10, 0x22, 0x00}, + {0x10, 0x22, 0xff}, + {0x10, 0xff, 0x00}, + {0x10, 0xff, 0xff}, + {0x11, 0x00, 0x00}, + {0x11, 0x00, 0xff}, + {0x11, 0xff, 0x00}, + {0x11, 0xff, 0xff}, + {0x12, 0x00, 0x00}, + {0x12, 0x00, 0xff}, + {0x12, 0xff, 0x00}, + {0x12, 0xff, 0xff}, + } + + // Insert the keys and values into storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, key := range keys { + err := operation.Upsert(key, []byte{1})(writer) + if err != nil { + return err + } + } + return nil + })) + + // Test iteration with range of first prefixes (0x10 to 0x11) + firstPrefixRangeExpected := [][]byte{ + {0x10, 0x00, 0x00}, + {0x10, 0x00, 0xff}, + {0x10, 0x19, 0x00}, + {0x10, 0x19, 0xff}, + {0x10, 0x20, 0x00}, + {0x10, 0x20, 0xff}, + {0x10, 0x21, 0x00}, + {0x10, 0x21, 0xff}, + {0x10, 0x22, 0x00}, + {0x10, 0x22, 0xff}, + {0x10, 0xff, 0x00}, + {0x10, 0xff, 0xff}, + {0x11, 0x00, 0x00}, + {0x11, 0x00, 0xff}, + {0x11, 0xff, 0x00}, + {0x11, 0xff, 0xff}, + } + firstPrefixRangeActual := make([][]byte, 0) + err := operation.IterateKeysInPrefixRange([]byte{0x10}, []byte{0x11}, func(key []byte) error { + firstPrefixRangeActual = append(firstPrefixRangeActual, key) + return nil + })(r) + require.NoError(t, err, "iterate with range of first prefixes should not return an error") + require.Equal(t, firstPrefixRangeExpected, firstPrefixRangeActual, "iterated values for range of first prefixes should match expected values") + + // Test iteration with range of second prefixes (0x1020 to 0x1021) + secondPrefixRangeActual := make([][]byte, 0) + secondPrefixRangeExpected := [][]byte{ + {0x10, 0x20, 0x00}, + {0x10, 0x20, 0xff}, + {0x10, 0x21, 0x00}, + {0x10, 0x21, 0xff}, + } + err = operation.IterateKeysInPrefixRange([]byte{0x10, 0x20}, []byte{0x10, 0x21}, func(key []byte) error { + secondPrefixRangeActual = append(secondPrefixRangeActual, key) + return nil + })(r) + require.NoError(t, err, "iterate with range of second prefixes should not return an error") + require.Equal(t, secondPrefixRangeExpected, secondPrefixRangeActual, "iterated values for range of second prefixes should match expected values") + }) +} + +func TestTraverse(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + keys := [][]byte{ + {0x42, 0x00}, + {0xff}, + {0x42, 0x56}, + {0x00}, + {0x42, 0xff}, + } + vals := []uint64{11, 13, 17, 19, 23} + expected := []uint64{11, 23} + + // Insert the keys and values into storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for i, key := range keys { + err := operation.Upsert(key, vals[i])(writer) + if err != nil { + return err + } + } + return nil + })) + + actual := make([]uint64, 0, len(keys)) + + // Define the iteration logic + iterationFunc := func() (operation.CheckFunc, operation.CreateFunc, operation.HandleFunc) { + check := func(key []byte) (bool, error) { + // Skip the key {0x42, 0x56} + return !bytes.Equal(key, []byte{0x42, 0x56}), nil + } + var val uint64 + create := func() interface{} { + return &val + } + handle := func() error { + actual = append(actual, val) + return nil + } + return check, create, handle + } + + // Traverse the keys starting with prefix {0x42} + err := operation.Traverse([]byte{0x42}, iterationFunc, storage.DefaultIteratorOptions())(r) + require.NoError(t, err, "traverse should not return an error") + + // Assert that the actual values match the expected values + require.Equal(t, expected, actual, "traversed values should match expected values") + }) +} + +// Verify traversing a subset of keys with only keys traversal +func TestTraverseKeyOnly(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + keys := [][]byte{ + // before start -> not included in range + {0x04, 0x33}, + {0x09, 0xff}, + // within the start prefix -> included in range + {0x10, 0x00}, + {0x10, 0xff}, + // between start and end -> included in range + {0x11, 0x00}, + {0x1A, 0xff}, + } + expected := [][]byte{ + {0x10, 0x00}, + {0x10, 0xff}, + } + + // Insert the keys and values into storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, key := range keys { + err := operation.Upsert(key, []byte{1})(writer) + if err != nil { + return err + } + } + return nil + })) + + actual := make([][]byte, 0) + + // Traverse the keys starting with prefix {0x11} + err := operation.Traverse([]byte{0x10}, operation.KeyOnlyIterateFunc(func(key []byte) error { + actual = append(actual, key) + return nil + }), storage.DefaultIteratorOptions())(r) + require.NoError(t, err, "traverse should not return an error") + + // Assert that the actual values match the expected values + require.Equal(t, expected, actual, "traversed values should match expected values") + }) +} + +func TestFindHighestAtOrBelow(t *testing.T) { + // Helper function to insert an entity into the storage + insertEntity := func(writer storage.Writer, prefix []byte, height uint64, entity Entity) error { + key := append(prefix, operation.EncodeKeyPart(height)...) + return operation.Upsert(key, entity)(writer) + } + + // Entities to be inserted + entities := []struct { + height uint64 + entity Entity + }{ + {5, Entity{ID: 41}}, + {10, Entity{ID: 42}}, + {15, Entity{ID: 43}}, + } + + // Run test with multiple storage backends + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + prefix := []byte("test_prefix") + + // Insert entities into the storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, e := range entities { + if err := insertEntity(writer, prefix, e.height, e.entity); err != nil { + return err + } + } + return nil + })) + + // Declare entity to store the results of FindHighestAtOrBelow + var entity Entity + + // Test cases + tests := []struct { + name string + height uint64 + expectedValue uint64 + expectError bool + expectedErrMsg string + }{ + {"target first height exists", 5, 41, false, ""}, + {"target height exists", 10, 42, false, ""}, + {"target height above", 11, 42, false, ""}, + {"target height above highest", 20, 43, false, ""}, + {"target height below lowest", 4, 0, true, storage.ErrNotFound.Error()}, + {"empty prefix", 5, 0, true, "prefix must not be empty"}, + } + + // Execute test cases + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + prefixToUse := prefix + + if tt.name == "empty prefix" { + prefixToUse = []byte{} + } + + err := operation.FindHighestAtOrBelow( + prefixToUse, + tt.height, + &entity)(r) + + if tt.expectError { + require.Error(t, err, fmt.Sprintf("expected error but got nil, entity: %v", entity)) + require.Contains(t, err.Error(), tt.expectedErrMsg) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectedValue, entity.ID) + } + }) + } + }) +} diff --git a/storage/operation/writes.go b/storage/operation/writes.go new file mode 100644 index 00000000000..3bbe08d12d2 --- /dev/null +++ b/storage/operation/writes.go @@ -0,0 +1,58 @@ +package operation + +import ( + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +// Upsert will encode the given entity using msgpack and will insert the resulting +// binary data under the provided key. +// If the key already exists, the value will be overwritten. +// Error returns: +// - generic error in case of unexpected failure from the database layer or +// encoding failure. +func Upsert(key []byte, val interface{}) func(storage.Writer) error { + return func(w storage.Writer) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + + err = w.Set(key, value) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } +} + +// Remove removes the entity with the given key, if it exists. If it doesn't +// exist, this is a no-op. +// Error returns: +// * generic error in case of unexpected database error +func Remove(key []byte) func(storage.Writer) error { + return func(w storage.Writer) error { + err := w.Delete(key) + if err != nil { + return irrecoverable.NewExceptionf("could not delete item: %w", err) + } + return nil + } +} + +// RemoveByPrefix removes all keys with the given prefix defined by [startPrefix, endPrefix] (both inclusive). +// If no keys exist with the given prefix, this is a no-op. +// Error returns: +// * generic error in case of unexpected database error +func RemoveByPrefix(reader storage.Reader, key []byte) func(storage.Writer) error { + return func(w storage.Writer) error { + err := w.DeleteByRange(reader, key, key) + if err != nil { + return irrecoverable.NewExceptionf("could not delete item: %w", err) + } + return nil + } +} diff --git a/storage/operation/writes_bench_test.go b/storage/operation/writes_bench_test.go new file mode 100644 index 00000000000..4c569d397f0 --- /dev/null +++ b/storage/operation/writes_bench_test.go @@ -0,0 +1,48 @@ +package operation_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" +) + +func BenchmarkUpsert(t *testing.B) { + dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) { + for i := 0; i < t.N; i++ { + e := Entity{ID: uint64(i)} + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + } + }) +} + +func BenchmarkRemove(t *testing.B) { + dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) { + n := t.N + for i := 0; i < n; i++ { + e := Entity{ID: uint64(i)} + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + } + t.ResetTimer() + for i := 0; i < n; i++ { + e := Entity{ID: uint64(i)} + require.NoError(t, withWriter(operation.Remove(e.Key()))) + } + }) +} + +func BenchmarkRemoveByPrefix(t *testing.B) { + dbtest.BenchWithStorages(t, func(t *testing.B, r storage.Reader, withWriter dbtest.WithWriter) { + prefix := []byte("prefix") + for i := 0; i < t.N; i++ { + e := Entity{ID: uint64(i)} + key := append(prefix, e.Key()...) + require.NoError(t, withWriter(operation.Upsert(key, e))) + } + t.ResetTimer() + require.NoError(t, withWriter(operation.RemoveByPrefix(r, prefix))) + }) +} diff --git a/storage/operation/writes_test.go b/storage/operation/writes_test.go new file mode 100644 index 00000000000..ac69daf1999 --- /dev/null +++ b/storage/operation/writes_test.go @@ -0,0 +1,302 @@ +package operation_test + +import ( + "encoding/binary" + "errors" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestReadWrite(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + e := Entity{ID: 1337} + + // Test read nothing should return not found + var item Entity + err := operation.Retrieve(e.Key(), &item)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error") + + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + + var readBack Entity + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, e, readBack, "expected retrieved value to match written value") + + // Test write again should overwrite + newEntity := Entity{ID: 42} + require.NoError(t, withWriter(operation.Upsert(e.Key(), newEntity))) + + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, newEntity, readBack, "expected overwritten value to be retrieved") + + // Test write should not overwrite a different key + anotherEntity := Entity{ID: 84} + require.NoError(t, withWriter(operation.Upsert(anotherEntity.Key(), anotherEntity))) + + var anotherReadBack Entity + require.NoError(t, operation.Retrieve(anotherEntity.Key(), &anotherReadBack)(r)) + require.Equal(t, anotherEntity, anotherReadBack, "expected different key to return different value") + }) +} + +func TestReadWriteMalformed(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + e := Entity{ID: 1337} + ue := UnencodeableEntity(e) + + // Test write should return encoding error + require.NoError(t, withWriter(func(writer storage.Writer) error { + err := operation.Upsert(e.Key(), ue)(writer) + require.Contains(t, err.Error(), errCantEncode.Error(), "expected encoding error") + return nil + })) + + // Test read should return decoding error + var exists bool + require.NoError(t, operation.Exists(e.Key(), &exists)(r)) + require.False(t, exists, "expected key to not exist") + }) +} + +// Verify multiple entities can be removed in one batch update +func TestBatchWrite(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + // Define multiple entities for batch insertion + entities := []Entity{ + {ID: 1337}, + {ID: 42}, + {ID: 84}, + } + + // Batch write: insert multiple entities in a single transaction + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, e := range entities { + if err := operation.Upsert(e.Key(), e)(writer); err != nil { + return err + } + } + return nil + })) + + // Verify that each entity can be read back + for _, e := range entities { + var readBack Entity + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, e, readBack, "expected retrieved value to match written value for entity ID %d", e.ID) + } + + // Batch update: remove multiple entities in a single transaction + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, e := range entities { + if err := operation.Remove(e.Key())(writer); err != nil { + return err + } + } + return nil + })) + + // Verify that each entity has been removed + for _, e := range entities { + var readBack Entity + err := operation.Retrieve(e.Key(), &readBack)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error for entity ID %d after removal", e.ID) + } + }) +} + +func TestRemove(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + e := Entity{ID: 1337} + + var exists bool + require.NoError(t, operation.Exists(e.Key(), &exists)(r)) + require.False(t, exists, "expected key to not exist") + + // Test delete nothing should return OK + require.NoError(t, withWriter(operation.Remove(e.Key()))) + + // Test write, delete, then read should return not found + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + + require.NoError(t, operation.Exists(e.Key(), &exists)(r)) + require.True(t, exists, "expected key to exist") + + require.NoError(t, withWriter(operation.Remove(e.Key()))) + + var item Entity + err := operation.Retrieve(e.Key(), &item)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error after delete") + }) +} + +func TestConcurrentWrite(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + var wg sync.WaitGroup + numWrites := 10 // number of concurrent writes + + for i := 0; i < numWrites; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + e := Entity{ID: uint64(i)} + + // Simulate a concurrent write to a different key + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + + var readBack Entity + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, e, readBack, "expected retrieved value to match written value for key %d", i) + }(i) + } + + wg.Wait() // Wait for all goroutines to finish + }) +} + +func TestConcurrentRemove(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + var wg sync.WaitGroup + numDeletes := 10 // number of concurrent deletions + + // First, insert entities to be deleted concurrently + for i := 0; i < numDeletes; i++ { + e := Entity{ID: uint64(i)} + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + } + + // Now, perform concurrent deletes + for i := 0; i < numDeletes; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + e := Entity{ID: uint64(i)} + + // Simulate a concurrent delete + require.NoError(t, withWriter(operation.Remove(e.Key()))) + + // Check that the item is no longer retrievable + var item Entity + err := operation.Retrieve(e.Key(), &item)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error after delete for key %d", i) + }(i) + } + + wg.Wait() // Wait for all goroutines to finish + }) +} + +func TestRemoveRange(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + + // Define the prefix + prefix := []byte{0x10} + + // Create a range of keys around the boundaries of the prefix + keys := [][]byte{ + // before prefix -> not included in range + {0x09, 0xff}, + // within the prefix -> included in range + {0x10, 0x00}, + {0x10, 0x50}, + {0x10, 0xff}, + // after end -> not included in range + {0x11, 0x00}, + {0x1A, 0xff}, + } + + // Keys expected to be in the prefix range + includeStart, includeEnd := 1, 3 + + // Insert the keys into the storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, key := range keys { + value := []byte{0x00} // value are skipped, doesn't matter + err := operation.Upsert(key, value)(writer) + if err != nil { + return err + } + } + return nil + })) + + // Remove the keys in the prefix range + require.NoError(t, withWriter(operation.RemoveByPrefix(r, prefix))) + + lg := unittest.Logger().With().Logger() + // Verify that the keys in the prefix range have been removed + for i, key := range keys { + var exists bool + require.NoError(t, operation.Exists(key, &exists)(r)) + lg.Info().Msgf("key %x exists: %t", key, exists) + + deleted := includeStart <= i && i <= includeEnd + + // deleted item should not exist + require.Equal(t, !deleted, exists, + "expected key %x to be %s", key, map[bool]string{true: "deleted", false: "not deleted"}) + } + + // Verify that after the removal, Traverse the removed prefix would return nothing + removedKeys := make([]string, 0) + err := operation.Traverse(prefix, operation.KeyOnlyIterateFunc(func(key []byte) error { + removedKeys = append(removedKeys, fmt.Sprintf("%x", key)) + return nil + }), storage.DefaultIteratorOptions())(r) + require.NoError(t, err) + require.Len(t, removedKeys, 0, "expected no entries to be found when traversing the removed prefix") + + // Verify that after the removal, Iterate over all keys should only return keys outside the prefix range + expected := [][]byte{ + {0x09, 0xff}, + {0x11, 0x00}, + {0x1A, 0xff}, + } + + actual := make([][]byte, 0) + err = operation.Iterate([]byte{keys[0][0]}, operation.PrefixUpperBound(keys[len(keys)-1]), operation.KeyOnlyIterateFunc(func(key []byte) error { + actual = append(actual, key) + return nil + }), storage.DefaultIteratorOptions())(r) + require.NoError(t, err) + require.Equal(t, expected, actual, "expected keys to match expected values") + }) +} + +type Entity struct { + ID uint64 +} + +func (e Entity) Key() []byte { + byteSlice := make([]byte, 8) // uint64 is 8 bytes + binary.BigEndian.PutUint64(byteSlice, e.ID) + return byteSlice +} + +type UnencodeableEntity Entity + +var errCantEncode = fmt.Errorf("encoding not supported") +var errCantDecode = fmt.Errorf("decoding not supported") + +func (a UnencodeableEntity) MarshalJSON() ([]byte, error) { + return nil, errCantEncode +} + +func (a *UnencodeableEntity) UnmarshalJSON(b []byte) error { + return errCantDecode +} + +func (a UnencodeableEntity) MarshalMsgpack() ([]byte, error) { + return nil, errCantEncode +} + +func (a UnencodeableEntity) UnmarshalMsgpack(b []byte) error { + return errCantDecode +} diff --git a/storage/operations.go b/storage/operations.go new file mode 100644 index 00000000000..95e25fcfa9d --- /dev/null +++ b/storage/operations.go @@ -0,0 +1,152 @@ +package storage + +import ( + "io" +) + +// Iterator is an interface for iterating over key-value pairs in a storage backend. +type Iterator interface { + // SeekGE seeks to the smallest key greater than or equal to the given key. + SeekGE() + + // Valid returns whether the iterator is positioned at a valid key-value pair. + Valid() bool + + // Next advances the iterator to the next key-value pair. + Next() + + // Key returns the key of the current key-value pair, or nil if done. + IterItem() IterItem + + // Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. + Close() error +} + +// IterItem is an interface for iterating over key-value pairs in a storage backend. +type IterItem interface { + Key() []byte + + // Value returns the value of the current key-value pair + // The reason it takes a function is to follow badgerDB's API pattern + Value(func(val []byte) error) error +} + +type IteratorOption struct { + IterateKeyOnly bool // default false +} + +func DefaultIteratorOptions() IteratorOption { + return IteratorOption{ + IterateKeyOnly: false, // only needed for badger. ignored by pebble + } +} + +type Reader interface { + // Get gets the value for the given key. It returns ErrNotFound if the DB + // does not contain the key. + // + // The caller should not modify the contents of the returned slice, but it is + // safe to modify the contents of the argument after Get returns. The + // returned slice will remain valid until the returned Closer is closed. On + // success, the caller MUST call closer.Close() or a memory leak will occur. + Get(key []byte) (value []byte, closer io.Closer, err error) + + // NewIter returns a new Iterator for the given key range [startPrefix, endPrefix], both inclusive. + NewIter(startPrefix, endPrefix []byte, ops IteratorOption) (Iterator, error) +} + +// Writer is an interface for batch writing to a storage backend. +type Writer interface { + // Set sets the value for the given key. It overwrites any previous value + // for that key; a DB is not a multi-map. + // + // It is safe to modify the contents of the arguments after Set returns. + Set(k, v []byte) error + + // Delete deletes the value for the given key. Deletes are blind all will + // succeed even if the given key does not exist. + // + // It is safe to modify the contents of the arguments after Delete returns. + Delete(key []byte) error + + // DeleteByRange removes all keys with a prefix that falls within the + // range [start, end], both inclusive. + DeleteByRange(globalReader Reader, startPrefix, endPrefix []byte) error +} + +// ReaderBatchWriter is an interface for reading and writing to a storage backend. +type ReaderBatchWriter interface { + // GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). + // This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. + // This reader may observe different values for the same key on subsequent reads. + GlobalReader() Reader + + // Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. + // When we `Write` into the batch, that write operation is added to the pending batch, but not committed. + // The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. + // Note: + // - The writer cannot be used concurrently for writing. + Writer() Writer + + // AddCallback adds a callback to execute after the batch has been flush + // regardless the batch update is succeeded or failed. + // The error parameter is the error returned by the batch update. + AddCallback(func(error)) +} + +// DB is an interface for a database store that provides a reader and a writer. +type DB interface { + // Reader returns a database-backed reader which reads the latest + // committed global database state + Reader() Reader + + // WithReaderBatchWriter creates a batch writer and allows the caller to perform + // atomic batch updates to the database. + // Any error returned are considered fatal and the batch is not committed. + WithReaderBatchWriter(func(ReaderBatchWriter) error) error +} + +// OnlyWriter is an adapter to convert a function that takes a Writer +// to a function that takes a ReaderBatchWriter. +func OnlyWriter(fn func(Writer) error) func(ReaderBatchWriter) error { + return func(rw ReaderBatchWriter) error { + return fn(rw.Writer()) + } +} + +// OnCommitSucceed adds a callback to execute after the batch has been successfully committed. +func OnCommitSucceed(b ReaderBatchWriter, onSuccessFn func()) { + b.AddCallback(func(err error) { + if err == nil { + onSuccessFn() + } + }) +} + +func StartEndPrefixToLowerUpperBound(startPrefix, endPrefix []byte) (lowerBound, upperBound []byte) { + // LowerBound specifies the smallest key to iterate and it's inclusive. + // UpperBound specifies the largest key to iterate and it's exclusive (not inclusive) + // in order to match all keys prefixed with the `end` bytes, we increment the bytes of end by 1, + // for instance, to iterate keys between "hello" and "world", + // we use "hello" as LowerBound, "worle" as UpperBound, so that "world", "world1", "worldffff...ffff" + // will all be included. + return startPrefix, prefixUpperBound(endPrefix) +} + +// prefixUpperBound returns a key K such that all possible keys beginning with the input prefix +// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble. +// This is used to define an upper bound for iteration, when we want to iterate over +// all keys beginning with a given prefix. +// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration +func prefixUpperBound(prefix []byte) []byte { + end := make([]byte, len(prefix)) + copy(end, prefix) + for i := len(end) - 1; i >= 0; i-- { + // increment the bytes by 1 + end[i] = end[i] + 1 + if end[i] != 0 { + return end[:i+1] + } + } + return nil // no upper-bound +} diff --git a/storage/badger/approvals.go b/storage/store/approvals.go similarity index 57% rename from storage/badger/approvals.go rename to storage/store/approvals.go index eb3cf4ae820..4268944adf8 100644 --- a/storage/badger/approvals.go +++ b/storage/store/approvals.go @@ -1,35 +1,33 @@ -package badger +package store import ( "errors" "fmt" - - "github.com/dgraph-io/badger/v2" + "sync" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/storage/badger/operation" - "github.com/onflow/flow-go/storage/badger/transaction" + "github.com/onflow/flow-go/storage/operation" ) // ResultApprovals implements persistent storage for result approvals. type ResultApprovals struct { - db *badger.DB - cache *Cache[flow.Identifier, *flow.ResultApproval] + db storage.DB + cache *Cache[flow.Identifier, *flow.ResultApproval] + indexing *sync.Mutex // preventing concurrent indexing of approvals } -func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApprovals { - - store := func(key flow.Identifier, val *flow.ResultApproval) func(*transaction.Tx) error { - return transaction.WithTx(operation.SkipDuplicates(operation.InsertResultApproval(val))) +func NewResultApprovals(collector module.CacheMetrics, db storage.DB) *ResultApprovals { + store := func(key flow.Identifier, val *flow.ResultApproval) func(rw storage.ReaderBatchWriter) error { + return storage.OnlyWriter(operation.InsertResultApproval(val)) } - retrieve := func(approvalID flow.Identifier) func(tx *badger.Txn) (*flow.ResultApproval, error) { + retrieve := func(approvalID flow.Identifier) func(r storage.Reader) (*flow.ResultApproval, error) { var approval flow.ResultApproval - return func(tx *badger.Txn) (*flow.ResultApproval, error) { - err := operation.RetrieveResultApproval(approvalID, &approval)(tx) + return func(r storage.Reader) (*flow.ResultApproval, error) { + err := operation.RetrieveResultApproval(approvalID, &approval)(r) return &approval, err } } @@ -40,18 +38,19 @@ func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApp withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100), withStore[flow.Identifier, *flow.ResultApproval](store), withRetrieve[flow.Identifier, *flow.ResultApproval](retrieve)), + indexing: new(sync.Mutex), } return res } -func (r *ResultApprovals) store(approval *flow.ResultApproval) func(*transaction.Tx) error { +func (r *ResultApprovals) store(approval *flow.ResultApproval) func(storage.ReaderBatchWriter) error { return r.cache.PutTx(approval.ID(), approval) } -func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*flow.ResultApproval, error) { - return func(tx *badger.Txn) (*flow.ResultApproval, error) { - val, err := r.cache.Get(approvalID)(tx) +func (r *ResultApprovals) byID(approvalID flow.Identifier) func(storage.Reader) (*flow.ResultApproval, error) { + return func(reader storage.Reader) (*flow.ResultApproval, error) { + val, err := r.cache.Get(approvalID)(reader) if err != nil { return nil, err } @@ -59,40 +58,37 @@ func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*f } } -func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(*badger.Txn) (*flow.ResultApproval, error) { - return func(tx *badger.Txn) (*flow.ResultApproval, error) { +func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(storage.Reader) (*flow.ResultApproval, error) { + return func(reader storage.Reader) (*flow.ResultApproval, error) { var approvalID flow.Identifier - err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(tx) + err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(reader) if err != nil { return nil, fmt.Errorf("could not lookup result approval ID: %w", err) } - return r.byID(approvalID)(tx) + return r.byID(approvalID)(reader) } } -func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { - return func(tx *badger.Txn) error { - err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(tx) - if err == nil { - return nil - } +// CAUTION: Caller must acquire `indexing` lock. +func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.ReaderBatchWriter) error { + return func(rw storage.ReaderBatchWriter) error { + var storedApprovalID flow.Identifier + err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(rw.GlobalReader()) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("could not lookup result approval ID: %w", err) + } - if !errors.Is(err, storage.ErrAlreadyExists) { - return err + // no approval found, index the approval + + return operation.UnsafeIndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer()) } - // When trying to index an approval for a result, and there is already - // an approval for the result, double check if the indexed approval is - // the same. + // an approval is already indexed, double check if it is the same // We don't allow indexing multiple approvals per chunk because the // store is only used within Verification nodes, and it is impossible // for a Verification node to compute different approvals for the same // chunk. - var storedApprovalID flow.Identifier - err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(tx) - if err != nil { - return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err) - } if storedApprovalID != approvalID { return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w", @@ -105,14 +101,22 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app // Store stores a ResultApproval func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { - return operation.RetryOnConflictTx(r.db, transaction.Update, r.store(approval)) + return r.db.WithReaderBatchWriter(r.store(approval)) } // Index indexes a ResultApproval by chunk (ResultID + chunk index). // operation is idempotent (repeated calls with the same value are equivalent to // just calling the method once; still the method succeeds on each call). func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error { - err := operation.RetryOnConflict(r.db.Update, r.index(resultID, chunkIndex, approvalID)) + // acquring the lock to prevent dirty reads when checking conflicted approvals + // how it works: + // the lock can only be acquired after the index operation is committed to the database, + // since the index operation is the only operation that would affect the reads operation, + // no writes can go through util the lock is released, so locking here could prevent dirty reads. + r.indexing.Lock() + defer r.indexing.Unlock() + + err := r.db.WithReaderBatchWriter(r.index(resultID, chunkIndex, approvalID)) if err != nil { return fmt.Errorf("could not index result approval: %w", err) } @@ -121,16 +125,12 @@ func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, app // ByID retrieves a ResultApproval by its ID func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error) { - tx := r.db.NewTransaction(false) - defer tx.Discard() - return r.byID(approvalID)(tx) + return r.byID(approvalID)(r.db.Reader()) } // ByChunk retrieves a ResultApproval by result ID and chunk index. The // ResultApprovals store is only used within a verification node, where it is // assumed that there is never more than one approval per chunk. func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) { - tx := r.db.NewTransaction(false) - defer tx.Discard() - return r.byChunk(resultID, chunkIndex)(tx) + return r.byChunk(resultID, chunkIndex)(r.db.Reader()) } diff --git a/storage/badger/approvals_test.go b/storage/store/approvals_test.go similarity index 50% rename from storage/badger/approvals_test.go rename to storage/store/approvals_test.go index 1b13a49ae59..0862e7e1537 100644 --- a/storage/badger/approvals_test.go +++ b/storage/store/approvals_test.go @@ -1,22 +1,23 @@ -package badger_test +package store_test import ( "errors" + "sync" "testing" - "github.com/dgraph-io/badger/v2" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" - bstorage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/dbtest" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) func TestApprovalStoreAndRetrieve(t *testing.T) { - unittest.RunWithBadgerDB(t, func(db *badger.DB) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { metrics := metrics.NewNoopCollector() - store := bstorage.NewResultApprovals(metrics, db) + store := store.NewResultApprovals(metrics, db) approval := unittest.ResultApprovalFixture() err := store.Store(approval) @@ -36,9 +37,9 @@ func TestApprovalStoreAndRetrieve(t *testing.T) { } func TestApprovalStoreTwice(t *testing.T) { - unittest.RunWithBadgerDB(t, func(db *badger.DB) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { metrics := metrics.NewNoopCollector() - store := bstorage.NewResultApprovals(metrics, db) + store := store.NewResultApprovals(metrics, db) approval := unittest.ResultApprovalFixture() err := store.Store(approval) @@ -56,9 +57,9 @@ func TestApprovalStoreTwice(t *testing.T) { } func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) { - unittest.RunWithBadgerDB(t, func(db *badger.DB) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { metrics := metrics.NewNoopCollector() - store := bstorage.NewResultApprovals(metrics, db) + store := store.NewResultApprovals(metrics, db) approval1 := unittest.ResultApprovalFixture() approval2 := unittest.ResultApprovalFixture() @@ -79,3 +80,52 @@ func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) { require.True(t, errors.Is(err, storage.ErrDataMismatch)) }) } + +// verify that storing and indexing two conflicting approvals concurrently should fail +// one of them is succeed, the other one should fail +func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + metrics := metrics.NewNoopCollector() + store := store.NewResultApprovals(metrics, db) + + approval1 := unittest.ResultApprovalFixture() + approval2 := unittest.ResultApprovalFixture() + + var wg sync.WaitGroup + wg.Add(2) + + var firstIndexErr, secondIndexErr error + + // First goroutine stores and indexes the first approval. + go func() { + defer wg.Done() + + err := store.Store(approval1) + require.NoError(t, err) + + firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID()) + }() + + // Second goroutine stores and tries to index the second approval for the same chunk. + go func() { + defer wg.Done() + + err := store.Store(approval2) + require.NoError(t, err) + + secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID()) + }() + + // Wait for both goroutines to finish + wg.Wait() + + // Check that one of the Index operations succeeded and the other failed + if firstIndexErr == nil { + require.Error(t, secondIndexErr) + require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch)) + } else { + require.NoError(t, secondIndexErr) + require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch)) + } + }) +} diff --git a/storage/store/cache.go b/storage/store/cache.go new file mode 100644 index 00000000000..45dbc8deb95 --- /dev/null +++ b/storage/store/cache.go @@ -0,0 +1,157 @@ +package store + +import ( + "errors" + "fmt" + + lru "github.com/hashicorp/golang-lru/v2" + + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +// nolint:unused +func withLimit[K comparable, V any](limit uint) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.limit = limit + } +} + +type storeFunc[K comparable, V any] func(key K, val V) func(storage.ReaderBatchWriter) error + +// nolint:unused +func withStore[K comparable, V any](store storeFunc[K, V]) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.store = store + } +} + +// nolint:unused +func noStore[K comparable, V any](_ K, _ V) func(storage.ReaderBatchWriter) error { + return func(tx storage.ReaderBatchWriter) error { + return fmt.Errorf("no store function for cache put available") + } +} + +// nolint: unused +func noopStore[K comparable, V any](_ K, _ V) func(storage.ReaderBatchWriter) error { + return func(tx storage.ReaderBatchWriter) error { + return nil + } +} + +type retrieveFunc[K comparable, V any] func(key K) func(storage.Reader) (V, error) + +// nolint:unused +func withRetrieve[K comparable, V any](retrieve retrieveFunc[K, V]) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.retrieve = retrieve + } +} + +// nolint:unused +func noRetrieve[K comparable, V any](_ K) func(storage.Reader) (V, error) { + return func(tx storage.Reader) (V, error) { + var nullV V + return nullV, fmt.Errorf("no retrieve function for cache get available") + } +} + +type Cache[K comparable, V any] struct { + metrics module.CacheMetrics + // nolint:unused + limit uint + store storeFunc[K, V] + retrieve retrieveFunc[K, V] + resource string + cache *lru.Cache[K, V] +} + +// nolint:unused +func newCache[K comparable, V any](collector module.CacheMetrics, resourceName string, options ...func(*Cache[K, V])) *Cache[K, V] { + c := Cache[K, V]{ + metrics: collector, + limit: 1000, + store: noStore[K, V], + retrieve: noRetrieve[K, V], + resource: resourceName, + } + for _, option := range options { + option(&c) + } + c.cache, _ = lru.New[K, V](int(c.limit)) + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + return &c +} + +// IsCached returns true if the key exists in the cache. +// It DOES NOT check whether the key exists in the underlying data store. +func (c *Cache[K, V]) IsCached(key K) bool { + return c.cache.Contains(key) +} + +// Get will try to retrieve the resource from cache first, and then from the +// injected. During normal operations, the following error returns are expected: +// - `storage.ErrNotFound` if key is unknown. +func (c *Cache[K, V]) Get(key K) func(storage.Reader) (V, error) { + return func(r storage.Reader) (V, error) { + + // check if we have it in the cache + resource, cached := c.cache.Get(key) + if cached { + c.metrics.CacheHit(c.resource) + return resource, nil + } + + // get it from the database + resource, err := c.retrieve(key)(r) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + c.metrics.CacheNotFound(c.resource) + } + var nullV V + return nullV, fmt.Errorf("could not retrieve resource: %w", err) + } + + c.metrics.CacheMiss(c.resource) + + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } + + return resource, nil + } +} + +func (c *Cache[K, V]) Remove(key K) { + c.cache.Remove(key) +} + +// Insert will add a resource directly to the cache with the given ID +func (c *Cache[K, V]) Insert(key K, resource V) { + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } +} + +// PutTx will return tx which adds a resource to the cache with the given ID. +func (c *Cache[K, V]) PutTx(key K, resource V) func(storage.ReaderBatchWriter) error { + storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution) + + return func(rw storage.ReaderBatchWriter) error { + storage.OnCommitSucceed(rw, func() { + c.Insert(key, resource) + }) + + err := storeOps(rw) // execute operations to store resource + if err != nil { + return fmt.Errorf("could not store resource: %w", err) + } + + return nil + } +} diff --git a/storage/badger/cache_test.go b/storage/store/cache_test.go similarity index 98% rename from storage/badger/cache_test.go rename to storage/store/cache_test.go index 76ea7ce18bc..d14de66c47b 100644 --- a/storage/badger/cache_test.go +++ b/storage/store/cache_test.go @@ -1,4 +1,4 @@ -package badger +package store import ( "testing" diff --git a/utils/unittest/unittest.go b/utils/unittest/unittest.go index 4d13b279087..d15f39cd27c 100644 --- a/utils/unittest/unittest.go +++ b/utils/unittest/unittest.go @@ -368,6 +368,11 @@ func TempBadgerDB(t testing.TB) (*badger.DB, string) { return db, dir } +func TempPebbleDB(t testing.TB) (*pebble.DB, string) { + dir := TempDir(t) + return PebbleDB(t, dir), dir +} + func TempPebblePath(t *testing.T) string { return path.Join(TempDir(t), "pebble"+strconv.Itoa(rand.Int())+".db") } @@ -380,6 +385,71 @@ func TempPebbleDBWithOpts(t testing.TB, opts *pebble.Options) (*pebble.DB, strin return db, dbpath } +func RunWithPebbleDB(t testing.TB, f func(*pebble.DB)) { + RunWithTempDir(t, func(dir string) { + db, err := pebble.Open(dir, &pebble.Options{}) + require.NoError(t, err) + defer func() { + assert.NoError(t, db.Close()) + }() + f(db) + }) +} + +func PebbleDB(t testing.TB, dir string) *pebble.DB { + db, err := pebble.Open(dir, &pebble.Options{}) + require.NoError(t, err) + return db +} + +func TypedPebbleDB(t testing.TB, dir string, create func(string, *pebble.Options) (*pebble.DB, error)) *pebble.DB { + db, err := create(dir, &pebble.Options{}) + require.NoError(t, err) + return db +} + +type PebbleWrapper struct { + db *pebble.DB +} + +func (p *PebbleWrapper) View(fn func(pebble.Reader) error) error { + return fn(p.db) +} + +func (p *PebbleWrapper) Update(fn func(pebble.Writer) error) error { + return fn(p.db) +} + +func (p *PebbleWrapper) DB() *pebble.DB { + return p.db +} + +func RunWithWrappedPebbleDB(t testing.TB, f func(p *PebbleWrapper)) { + RunWithTempDir(t, func(dir string) { + db, err := pebble.Open(dir, &pebble.Options{}) + require.NoError(t, err) + defer func() { + assert.NoError(t, db.Close()) + }() + f(&PebbleWrapper{db}) + }) + +} + +func RunWithTypedPebbleDB( + t testing.TB, + create func(string, *pebble.Options) (*pebble.DB, error), + f func(*pebble.DB)) { + RunWithTempDir(t, func(dir string) { + db, err := create(dir, &pebble.Options{}) + require.NoError(t, err) + defer func() { + assert.NoError(t, db.Close()) + }() + f(db) + }) +} + func Concurrently(n int, f func(int)) { var wg sync.WaitGroup for i := 0; i < n; i++ {