diff --git a/cmd/verification_builder.go b/cmd/verification_builder.go index 70c84617f02..3c0cac6a1c9 100644 --- a/cmd/verification_builder.go +++ b/cmd/verification_builder.go @@ -38,6 +38,8 @@ import ( badgerState "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/state/protocol/blocktimer" "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/store" ) type VerificationConfig struct { @@ -201,7 +203,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { vmCtx := fvm.NewContext(fvmOptions...) chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Logger) - approvalStorage := badger.NewResultApprovals(node.Metrics.Cache, node.DB) + approvalStorage := store.NewResultApprovals(node.Metrics.Cache, badgerimpl.ToDB(node.DB)) verifierEng, err = verifier.New( node.Logger, collector, diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index b6d1037b500..3f6364c67ea 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -105,7 +105,9 @@ import ( "github.com/onflow/flow-go/state/protocol/events/gadgets" "github.com/onflow/flow-go/state/protocol/util" storage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/badgerimpl" storagepebble "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) @@ -1021,7 +1023,7 @@ func VerificationNode(t testing.TB, chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Log) - approvalStorage := storage.NewResultApprovals(node.Metrics, node.PublicDB) + approvalStorage := store.NewResultApprovals(node.Metrics, badgerimpl.ToDB(node.PublicDB)) node.VerifierEngine, err = verifier.New(node.Log, collector, diff --git a/storage/badger/operation/approvals.go b/storage/badger/operation/approvals.go deleted file mode 100644 index 8a994eed2a2..00000000000 --- a/storage/badger/operation/approvals.go +++ /dev/null @@ -1,31 +0,0 @@ -package operation - -import ( - "github.com/dgraph-io/badger/v2" - - "github.com/onflow/flow-go/model/flow" -) - -// InsertResultApproval inserts a ResultApproval by ID. -func InsertResultApproval(approval *flow.ResultApproval) func(*badger.Txn) error { - return insert(makePrefix(codeResultApproval, approval.ID()), approval) -} - -// RetrieveResultApproval retrieves an approval by ID. -func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(*badger.Txn) error { - return retrieve(makePrefix(codeResultApproval, approvalID), approval) -} - -// IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID -// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists -// error is returned. This operation is only used by the ResultApprovals store, -// which is only used within a Verification node, where it is assumed that there -// is only one approval per chunk. -func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { - return insert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) -} - -// LookupResultApproval finds a ResultApproval by result ID and chunk index. -func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(*badger.Txn) error { - return retrieve(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) -} diff --git a/storage/operation/approvals.go b/storage/operation/approvals.go new file mode 100644 index 00000000000..39d88c1f682 --- /dev/null +++ b/storage/operation/approvals.go @@ -0,0 +1,34 @@ +package operation + +import ( + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" +) + +// InsertResultApproval inserts a ResultApproval by ID. +// The same key (`approval.ID()`) necessitates that the value (full `approval`) is +// also identical (otherwise, we would have a successful pre-image attack on our +// cryptographic hash function). Therefore, concurrent calls to this function are safe. +func InsertResultApproval(approval *flow.ResultApproval) func(storage.Writer) error { + return Upsert(makePrefix(codeResultApproval, approval.ID()), approval) +} + +// RetrieveResultApproval retrieves an approval by ID. +func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(storage.Reader) error { + return Retrieve(makePrefix(codeResultApproval, approvalID), approval) +} + +// UnsafeIndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID +// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists +// error is returned. This operation is only used by the ResultApprovals store, +// which is only used within a Verification node, where it is assumed that there +// is only one approval per chunk. +// CAUTION: Use of this function must be synchronized by storage.ResultApprovals. +func UnsafeIndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error { + return Upsert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) +} + +// LookupResultApproval finds a ResultApproval by result ID and chunk index. +func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(storage.Reader) error { + return Retrieve(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) +} diff --git a/storage/operation/badgerimpl/dbstore.go b/storage/operation/badgerimpl/dbstore.go new file mode 100644 index 00000000000..b8460165e32 --- /dev/null +++ b/storage/operation/badgerimpl/dbstore.go @@ -0,0 +1,23 @@ +package badgerimpl + +import ( + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" +) + +func ToDB(db *badger.DB) storage.DB { + return &dbStore{db: db} +} + +type dbStore struct { + db *badger.DB +} + +func (b *dbStore) Reader() storage.Reader { + return dbReader{db: b.db} +} + +func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error { + return WithReaderBatchWriter(b.db, fn) +} diff --git a/storage/operation/dbtest/helper.go b/storage/operation/dbtest/helper.go index 153d63ae589..7c21517d9b0 100644 --- a/storage/operation/dbtest/helper.go +++ b/storage/operation/dbtest/helper.go @@ -84,3 +84,17 @@ func runWithPebble(fn func(storage.Reader, WithWriter)) func(*pebble.DB) { fn(reader, withWriter) } } + +func RunWithDB(t *testing.T, fn func(t *testing.T, store storage.DB)) { + t.Run("BadgerStorage", func(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + fn(t, badgerimpl.ToDB(db)) + }) + }) + + t.Run("PebbleStorage", func(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + fn(t, pebbleimpl.ToDB(db)) + }) + }) +} diff --git a/storage/operation/pebbleimpl/dbstore.go b/storage/operation/pebbleimpl/dbstore.go new file mode 100644 index 00000000000..fcc5b14a06a --- /dev/null +++ b/storage/operation/pebbleimpl/dbstore.go @@ -0,0 +1,23 @@ +package pebbleimpl + +import ( + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/storage" +) + +func ToDB(db *pebble.DB) storage.DB { + return &dbStore{db: db} +} + +type dbStore struct { + db *pebble.DB +} + +func (b *dbStore) Reader() storage.Reader { + return dbReader{db: b.db} +} + +func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error { + return WithReaderBatchWriter(b.db, fn) +} diff --git a/storage/operation/prefix.go b/storage/operation/prefix.go new file mode 100644 index 00000000000..feacac40c6a --- /dev/null +++ b/storage/operation/prefix.go @@ -0,0 +1,163 @@ +package operation + +const ( + + // codes for special database markers + // codeMax = 1 // keeps track of the maximum key size + // nolint:unused + codeDBType = 2 // specifies a database type + + // codes for views with special meaning + // nolint:unused + codeSafetyData = 10 // safety data for hotstuff state + // nolint:unused + codeLivenessData = 11 // liveness data for hotstuff state + + // codes for fields associated with the root state + // nolint:unused + codeSporkID = 13 + // nolint:unused + codeProtocolVersion = 14 + // nolint:unused + codeEpochCommitSafetyThreshold = 15 + // nolint:unused + codeSporkRootBlockHeight = 16 + + // code for heights with special meaning + // nolint:unused + codeFinalizedHeight = 20 // latest finalized block height + // nolint:unused + codeSealedHeight = 21 // latest sealed block height + // nolint:unused + codeClusterHeight = 22 // latest finalized height on cluster + // nolint:unused + codeExecutedBlock = 23 // latest executed block with max height + // nolint:unused + codeFinalizedRootHeight = 24 // the height of the highest finalized block contained in the root snapshot + // nolint:unused + codeLastCompleteBlockHeight = 25 // the height of the last block for which all collections were received + // nolint:unused + codeEpochFirstHeight = 26 // the height of the first block in a given epoch + // nolint:unused + codeSealedRootHeight = 27 // the height of the highest sealed block contained in the root snapshot + + // codes for single entity storage + // nolint:unused + codeHeader = 30 + _ = 31 // DEPRECATED: 31 was used for identities before epochs + codeGuarantee = 32 + codeSeal = 33 + codeTransaction = 34 + codeCollection = 35 + codeExecutionResult = 36 + codeResultApproval = 37 + codeChunk = 38 + codeExecutionReceiptMeta = 39 // NOTE: prior to Mainnet25, this erroneously had the same value as codeExecutionResult (36) + + // codes for indexing single identifier by identifier/integer + // nolint:unused + codeHeightToBlock = 40 // index mapping height to block ID + // nolint:unused + codeBlockIDToLatestSealID = 41 // index mapping a block its last payload seal + // nolint:unused + codeClusterBlockToRefBlock = 42 // index cluster block ID to reference block ID + // nolint:unused + codeRefHeightToClusterBlock = 43 // index reference block height to cluster block IDs + // nolint:unused + codeBlockIDToFinalizedSeal = 44 // index _finalized_ seal by sealed block ID + // nolint:unused + codeBlockIDToQuorumCertificate = 45 // index of quorum certificates by block ID + // nolint:unused + codeEpochProtocolStateByBlockID = 46 // index of epoch protocol state entry by block ID + // nolint:unused + codeProtocolKVStoreByBlockID = 47 // index of protocol KV store entry by block ID + + // codes for indexing multiple identifiers by identifier + // nolint:unused + codeBlockChildren = 50 // index mapping block ID to children blocks + _ = 51 // DEPRECATED: 51 was used for identity indexes before epochs + // nolint:unused + codePayloadGuarantees = 52 // index mapping block ID to payload guarantees + // nolint:unused + codePayloadSeals = 53 // index mapping block ID to payload seals + // nolint:unused + codeCollectionBlock = 54 // index mapping collection ID to block ID + // nolint:unused + codeOwnBlockReceipt = 55 // index mapping block ID to execution receipt ID for execution nodes + _ = 56 // DEPRECATED: 56 was used for block->epoch status prior to Dynamic Protocol State in Mainnet25 + // nolint:unused + codePayloadReceipts = 57 // index mapping block ID to payload receipts + // nolint:unused + codePayloadResults = 58 // index mapping block ID to payload results + // nolint:unused + codeAllBlockReceipts = 59 // index mapping of blockID to multiple receipts + // nolint:unused + codePayloadProtocolStateID = 60 // index mapping block ID to payload protocol state ID + + // codes related to protocol level information + // nolint:unused + codeEpochSetup = 61 // EpochSetup service event, keyed by ID + // nolint:unused + codeEpochCommit = 62 // EpochCommit service event, keyed by ID + // nolint:unused + codeBeaconPrivateKey = 63 // BeaconPrivateKey, keyed by epoch counter + // nolint:unused + codeDKGStarted = 64 // flag that the DKG for an epoch has been started + // nolint:unused + codeDKGEnded = 65 // flag that the DKG for an epoch has ended (stores end state) + // nolint:unused + codeVersionBeacon = 67 // flag for storing version beacons + // nolint:unused + codeEpochProtocolState = 68 + // nolint:unused + codeProtocolKVStore = 69 + + // code for ComputationResult upload status storage + // NOTE: for now only GCP uploader is supported. When other uploader (AWS e.g.) needs to + // be supported, we will need to define new code. + // nolint:unused + codeComputationResults = 66 + + // job queue consumers and producers + // nolint:unused + codeJobConsumerProcessed = 70 + // nolint:unused + codeJobQueue = 71 + // nolint:unused + codeJobQueuePointer = 72 + + // legacy codes (should be cleaned up) + codeChunkDataPack = 100 + codeCommit = 101 + codeEvent = 102 + codeExecutionStateInteractions = 103 + codeTransactionResult = 104 + codeFinalizedCluster = 105 + codeServiceEvent = 106 + codeTransactionResultIndex = 107 + codeLightTransactionResult = 108 + codeLightTransactionResultIndex = 109 + codeIndexCollection = 200 + codeIndexExecutionResultByBlock = 202 + codeIndexCollectionByTransaction = 203 + codeIndexResultApprovalByChunk = 204 + + // TEMPORARY codes + // nolint:unused + blockedNodeIDs = 205 // manual override for adding node IDs to list of ejected nodes, applies to networking layer only + + // internal failure information that should be preserved across restarts + // nolint:unused + codeExecutionFork = 254 + // nolint:unused + codeEpochEmergencyFallbackTriggered = 255 +) + +func makePrefix(code byte, keys ...interface{}) []byte { + prefix := make([]byte, 1) + prefix[0] = code + for _, key := range keys { + prefix = append(prefix, EncodeKeyPart(key)...) + } + return prefix +} diff --git a/storage/operations.go b/storage/operations.go index c261d4ba28c..95e25fcfa9d 100644 --- a/storage/operations.go +++ b/storage/operations.go @@ -94,6 +94,26 @@ type ReaderBatchWriter interface { AddCallback(func(error)) } +// DB is an interface for a database store that provides a reader and a writer. +type DB interface { + // Reader returns a database-backed reader which reads the latest + // committed global database state + Reader() Reader + + // WithReaderBatchWriter creates a batch writer and allows the caller to perform + // atomic batch updates to the database. + // Any error returned are considered fatal and the batch is not committed. + WithReaderBatchWriter(func(ReaderBatchWriter) error) error +} + +// OnlyWriter is an adapter to convert a function that takes a Writer +// to a function that takes a ReaderBatchWriter. +func OnlyWriter(fn func(Writer) error) func(ReaderBatchWriter) error { + return func(rw ReaderBatchWriter) error { + return fn(rw.Writer()) + } +} + // OnCommitSucceed adds a callback to execute after the batch has been successfully committed. func OnCommitSucceed(b ReaderBatchWriter, onSuccessFn func()) { b.AddCallback(func(err error) { diff --git a/storage/badger/approvals.go b/storage/store/approvals.go similarity index 57% rename from storage/badger/approvals.go rename to storage/store/approvals.go index eb3cf4ae820..4268944adf8 100644 --- a/storage/badger/approvals.go +++ b/storage/store/approvals.go @@ -1,35 +1,33 @@ -package badger +package store import ( "errors" "fmt" - - "github.com/dgraph-io/badger/v2" + "sync" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/storage/badger/operation" - "github.com/onflow/flow-go/storage/badger/transaction" + "github.com/onflow/flow-go/storage/operation" ) // ResultApprovals implements persistent storage for result approvals. type ResultApprovals struct { - db *badger.DB - cache *Cache[flow.Identifier, *flow.ResultApproval] + db storage.DB + cache *Cache[flow.Identifier, *flow.ResultApproval] + indexing *sync.Mutex // preventing concurrent indexing of approvals } -func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApprovals { - - store := func(key flow.Identifier, val *flow.ResultApproval) func(*transaction.Tx) error { - return transaction.WithTx(operation.SkipDuplicates(operation.InsertResultApproval(val))) +func NewResultApprovals(collector module.CacheMetrics, db storage.DB) *ResultApprovals { + store := func(key flow.Identifier, val *flow.ResultApproval) func(rw storage.ReaderBatchWriter) error { + return storage.OnlyWriter(operation.InsertResultApproval(val)) } - retrieve := func(approvalID flow.Identifier) func(tx *badger.Txn) (*flow.ResultApproval, error) { + retrieve := func(approvalID flow.Identifier) func(r storage.Reader) (*flow.ResultApproval, error) { var approval flow.ResultApproval - return func(tx *badger.Txn) (*flow.ResultApproval, error) { - err := operation.RetrieveResultApproval(approvalID, &approval)(tx) + return func(r storage.Reader) (*flow.ResultApproval, error) { + err := operation.RetrieveResultApproval(approvalID, &approval)(r) return &approval, err } } @@ -40,18 +38,19 @@ func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApp withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100), withStore[flow.Identifier, *flow.ResultApproval](store), withRetrieve[flow.Identifier, *flow.ResultApproval](retrieve)), + indexing: new(sync.Mutex), } return res } -func (r *ResultApprovals) store(approval *flow.ResultApproval) func(*transaction.Tx) error { +func (r *ResultApprovals) store(approval *flow.ResultApproval) func(storage.ReaderBatchWriter) error { return r.cache.PutTx(approval.ID(), approval) } -func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*flow.ResultApproval, error) { - return func(tx *badger.Txn) (*flow.ResultApproval, error) { - val, err := r.cache.Get(approvalID)(tx) +func (r *ResultApprovals) byID(approvalID flow.Identifier) func(storage.Reader) (*flow.ResultApproval, error) { + return func(reader storage.Reader) (*flow.ResultApproval, error) { + val, err := r.cache.Get(approvalID)(reader) if err != nil { return nil, err } @@ -59,40 +58,37 @@ func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*f } } -func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(*badger.Txn) (*flow.ResultApproval, error) { - return func(tx *badger.Txn) (*flow.ResultApproval, error) { +func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(storage.Reader) (*flow.ResultApproval, error) { + return func(reader storage.Reader) (*flow.ResultApproval, error) { var approvalID flow.Identifier - err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(tx) + err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(reader) if err != nil { return nil, fmt.Errorf("could not lookup result approval ID: %w", err) } - return r.byID(approvalID)(tx) + return r.byID(approvalID)(reader) } } -func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error { - return func(tx *badger.Txn) error { - err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(tx) - if err == nil { - return nil - } +// CAUTION: Caller must acquire `indexing` lock. +func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.ReaderBatchWriter) error { + return func(rw storage.ReaderBatchWriter) error { + var storedApprovalID flow.Identifier + err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(rw.GlobalReader()) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("could not lookup result approval ID: %w", err) + } - if !errors.Is(err, storage.ErrAlreadyExists) { - return err + // no approval found, index the approval + + return operation.UnsafeIndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer()) } - // When trying to index an approval for a result, and there is already - // an approval for the result, double check if the indexed approval is - // the same. + // an approval is already indexed, double check if it is the same // We don't allow indexing multiple approvals per chunk because the // store is only used within Verification nodes, and it is impossible // for a Verification node to compute different approvals for the same // chunk. - var storedApprovalID flow.Identifier - err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(tx) - if err != nil { - return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err) - } if storedApprovalID != approvalID { return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w", @@ -105,14 +101,22 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app // Store stores a ResultApproval func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { - return operation.RetryOnConflictTx(r.db, transaction.Update, r.store(approval)) + return r.db.WithReaderBatchWriter(r.store(approval)) } // Index indexes a ResultApproval by chunk (ResultID + chunk index). // operation is idempotent (repeated calls with the same value are equivalent to // just calling the method once; still the method succeeds on each call). func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error { - err := operation.RetryOnConflict(r.db.Update, r.index(resultID, chunkIndex, approvalID)) + // acquring the lock to prevent dirty reads when checking conflicted approvals + // how it works: + // the lock can only be acquired after the index operation is committed to the database, + // since the index operation is the only operation that would affect the reads operation, + // no writes can go through util the lock is released, so locking here could prevent dirty reads. + r.indexing.Lock() + defer r.indexing.Unlock() + + err := r.db.WithReaderBatchWriter(r.index(resultID, chunkIndex, approvalID)) if err != nil { return fmt.Errorf("could not index result approval: %w", err) } @@ -121,16 +125,12 @@ func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, app // ByID retrieves a ResultApproval by its ID func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error) { - tx := r.db.NewTransaction(false) - defer tx.Discard() - return r.byID(approvalID)(tx) + return r.byID(approvalID)(r.db.Reader()) } // ByChunk retrieves a ResultApproval by result ID and chunk index. The // ResultApprovals store is only used within a verification node, where it is // assumed that there is never more than one approval per chunk. func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) { - tx := r.db.NewTransaction(false) - defer tx.Discard() - return r.byChunk(resultID, chunkIndex)(tx) + return r.byChunk(resultID, chunkIndex)(r.db.Reader()) } diff --git a/storage/badger/approvals_test.go b/storage/store/approvals_test.go similarity index 50% rename from storage/badger/approvals_test.go rename to storage/store/approvals_test.go index 1b13a49ae59..0862e7e1537 100644 --- a/storage/badger/approvals_test.go +++ b/storage/store/approvals_test.go @@ -1,22 +1,23 @@ -package badger_test +package store_test import ( "errors" + "sync" "testing" - "github.com/dgraph-io/badger/v2" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/storage" - bstorage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/operation/dbtest" + "github.com/onflow/flow-go/storage/store" "github.com/onflow/flow-go/utils/unittest" ) func TestApprovalStoreAndRetrieve(t *testing.T) { - unittest.RunWithBadgerDB(t, func(db *badger.DB) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { metrics := metrics.NewNoopCollector() - store := bstorage.NewResultApprovals(metrics, db) + store := store.NewResultApprovals(metrics, db) approval := unittest.ResultApprovalFixture() err := store.Store(approval) @@ -36,9 +37,9 @@ func TestApprovalStoreAndRetrieve(t *testing.T) { } func TestApprovalStoreTwice(t *testing.T) { - unittest.RunWithBadgerDB(t, func(db *badger.DB) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { metrics := metrics.NewNoopCollector() - store := bstorage.NewResultApprovals(metrics, db) + store := store.NewResultApprovals(metrics, db) approval := unittest.ResultApprovalFixture() err := store.Store(approval) @@ -56,9 +57,9 @@ func TestApprovalStoreTwice(t *testing.T) { } func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) { - unittest.RunWithBadgerDB(t, func(db *badger.DB) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { metrics := metrics.NewNoopCollector() - store := bstorage.NewResultApprovals(metrics, db) + store := store.NewResultApprovals(metrics, db) approval1 := unittest.ResultApprovalFixture() approval2 := unittest.ResultApprovalFixture() @@ -79,3 +80,52 @@ func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) { require.True(t, errors.Is(err, storage.ErrDataMismatch)) }) } + +// verify that storing and indexing two conflicting approvals concurrently should fail +// one of them is succeed, the other one should fail +func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + metrics := metrics.NewNoopCollector() + store := store.NewResultApprovals(metrics, db) + + approval1 := unittest.ResultApprovalFixture() + approval2 := unittest.ResultApprovalFixture() + + var wg sync.WaitGroup + wg.Add(2) + + var firstIndexErr, secondIndexErr error + + // First goroutine stores and indexes the first approval. + go func() { + defer wg.Done() + + err := store.Store(approval1) + require.NoError(t, err) + + firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID()) + }() + + // Second goroutine stores and tries to index the second approval for the same chunk. + go func() { + defer wg.Done() + + err := store.Store(approval2) + require.NoError(t, err) + + secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID()) + }() + + // Wait for both goroutines to finish + wg.Wait() + + // Check that one of the Index operations succeeded and the other failed + if firstIndexErr == nil { + require.Error(t, secondIndexErr) + require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch)) + } else { + require.NoError(t, secondIndexErr) + require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch)) + } + }) +} diff --git a/storage/store/cache.go b/storage/store/cache.go new file mode 100644 index 00000000000..45dbc8deb95 --- /dev/null +++ b/storage/store/cache.go @@ -0,0 +1,157 @@ +package store + +import ( + "errors" + "fmt" + + lru "github.com/hashicorp/golang-lru/v2" + + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/storage" +) + +// nolint:unused +func withLimit[K comparable, V any](limit uint) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.limit = limit + } +} + +type storeFunc[K comparable, V any] func(key K, val V) func(storage.ReaderBatchWriter) error + +// nolint:unused +func withStore[K comparable, V any](store storeFunc[K, V]) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.store = store + } +} + +// nolint:unused +func noStore[K comparable, V any](_ K, _ V) func(storage.ReaderBatchWriter) error { + return func(tx storage.ReaderBatchWriter) error { + return fmt.Errorf("no store function for cache put available") + } +} + +// nolint: unused +func noopStore[K comparable, V any](_ K, _ V) func(storage.ReaderBatchWriter) error { + return func(tx storage.ReaderBatchWriter) error { + return nil + } +} + +type retrieveFunc[K comparable, V any] func(key K) func(storage.Reader) (V, error) + +// nolint:unused +func withRetrieve[K comparable, V any](retrieve retrieveFunc[K, V]) func(*Cache[K, V]) { + return func(c *Cache[K, V]) { + c.retrieve = retrieve + } +} + +// nolint:unused +func noRetrieve[K comparable, V any](_ K) func(storage.Reader) (V, error) { + return func(tx storage.Reader) (V, error) { + var nullV V + return nullV, fmt.Errorf("no retrieve function for cache get available") + } +} + +type Cache[K comparable, V any] struct { + metrics module.CacheMetrics + // nolint:unused + limit uint + store storeFunc[K, V] + retrieve retrieveFunc[K, V] + resource string + cache *lru.Cache[K, V] +} + +// nolint:unused +func newCache[K comparable, V any](collector module.CacheMetrics, resourceName string, options ...func(*Cache[K, V])) *Cache[K, V] { + c := Cache[K, V]{ + metrics: collector, + limit: 1000, + store: noStore[K, V], + retrieve: noRetrieve[K, V], + resource: resourceName, + } + for _, option := range options { + option(&c) + } + c.cache, _ = lru.New[K, V](int(c.limit)) + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + return &c +} + +// IsCached returns true if the key exists in the cache. +// It DOES NOT check whether the key exists in the underlying data store. +func (c *Cache[K, V]) IsCached(key K) bool { + return c.cache.Contains(key) +} + +// Get will try to retrieve the resource from cache first, and then from the +// injected. During normal operations, the following error returns are expected: +// - `storage.ErrNotFound` if key is unknown. +func (c *Cache[K, V]) Get(key K) func(storage.Reader) (V, error) { + return func(r storage.Reader) (V, error) { + + // check if we have it in the cache + resource, cached := c.cache.Get(key) + if cached { + c.metrics.CacheHit(c.resource) + return resource, nil + } + + // get it from the database + resource, err := c.retrieve(key)(r) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + c.metrics.CacheNotFound(c.resource) + } + var nullV V + return nullV, fmt.Errorf("could not retrieve resource: %w", err) + } + + c.metrics.CacheMiss(c.resource) + + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } + + return resource, nil + } +} + +func (c *Cache[K, V]) Remove(key K) { + c.cache.Remove(key) +} + +// Insert will add a resource directly to the cache with the given ID +func (c *Cache[K, V]) Insert(key K, resource V) { + // cache the resource and eject least recently used one if we reached limit + evicted := c.cache.Add(key, resource) + if !evicted { + c.metrics.CacheEntries(c.resource, uint(c.cache.Len())) + } +} + +// PutTx will return tx which adds a resource to the cache with the given ID. +func (c *Cache[K, V]) PutTx(key K, resource V) func(storage.ReaderBatchWriter) error { + storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution) + + return func(rw storage.ReaderBatchWriter) error { + storage.OnCommitSucceed(rw, func() { + c.Insert(key, resource) + }) + + err := storeOps(rw) // execute operations to store resource + if err != nil { + return fmt.Errorf("could not store resource: %w", err) + } + + return nil + } +} diff --git a/storage/badger/cache_test.go b/storage/store/cache_test.go similarity index 98% rename from storage/badger/cache_test.go rename to storage/store/cache_test.go index 76ea7ce18bc..d14de66c47b 100644 --- a/storage/badger/cache_test.go +++ b/storage/store/cache_test.go @@ -1,4 +1,4 @@ -package badger +package store import ( "testing"