Skip to content

Commit

Permalink
refactor approvals
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Sep 17, 2024
1 parent 87dc8e1 commit f297689
Show file tree
Hide file tree
Showing 13 changed files with 546 additions and 89 deletions.
4 changes: 3 additions & 1 deletion cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
"github.com/onflow/flow-go/storage/store"
)

type VerificationConfig struct {
Expand Down Expand Up @@ -201,7 +203,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
vmCtx := fvm.NewContext(fvmOptions...)

chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Logger)
approvalStorage := badger.NewResultApprovals(node.Metrics.Cache, node.DB)
approvalStorage := store.NewResultApprovals(node.Metrics.Cache, badgerimpl.ToDB(node.DB))
verifierEng, err = verifier.New(
node.Logger,
collector,
Expand Down
4 changes: 3 additions & 1 deletion engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ import (
"github.com/onflow/flow-go/state/protocol/events/gadgets"
"github.com/onflow/flow-go/state/protocol/util"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
storagepebble "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -1021,7 +1023,7 @@ func VerificationNode(t testing.TB,

chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Log)

approvalStorage := storage.NewResultApprovals(node.Metrics, node.PublicDB)
approvalStorage := store.NewResultApprovals(node.Metrics, badgerimpl.ToDB(node.PublicDB))

node.VerifierEngine, err = verifier.New(node.Log,
collector,
Expand Down
31 changes: 0 additions & 31 deletions storage/badger/operation/approvals.go

This file was deleted.

34 changes: 34 additions & 0 deletions storage/operation/approvals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package operation

import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

// InsertResultApproval inserts a ResultApproval by ID.
// The same key (`approval.ID()`) necessitates that the value (full `approval`) is
// also identical (otherwise, we would have a successful pre-image attack on our
// cryptographic hash function). Therefore, concurrent calls to this function are safe.
func InsertResultApproval(approval *flow.ResultApproval) func(storage.Writer) error {
return Upsert(makePrefix(codeResultApproval, approval.ID()), approval)
}

// RetrieveResultApproval retrieves an approval by ID.
func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(storage.Reader) error {
return Retrieve(makePrefix(codeResultApproval, approvalID), approval)
}

// UnsafeIndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID
// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists
// error is returned. This operation is only used by the ResultApprovals store,
// which is only used within a Verification node, where it is assumed that there
// is only one approval per chunk.
// CAUTION: Use of this function must be synchronized by storage.ResultApprovals.
func UnsafeIndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error {
return Upsert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
}

// LookupResultApproval finds a ResultApproval by result ID and chunk index.
func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(storage.Reader) error {
return Retrieve(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
}
23 changes: 23 additions & 0 deletions storage/operation/badgerimpl/dbstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package badgerimpl

import (
"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/storage"
)

func ToDB(db *badger.DB) storage.DB {
return &dbStore{db: db}
}

type dbStore struct {
db *badger.DB
}

func (b *dbStore) Reader() storage.Reader {
return dbReader{db: b.db}
}

func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error {
return WithReaderBatchWriter(b.db, fn)
}
14 changes: 14 additions & 0 deletions storage/operation/dbtest/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,17 @@ func runWithPebble(fn func(storage.Reader, WithWriter)) func(*pebble.DB) {
fn(reader, withWriter)
}
}

func RunWithDB(t *testing.T, fn func(t *testing.T, store storage.DB)) {
t.Run("BadgerStorage", func(t *testing.T) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
fn(t, badgerimpl.ToDB(db))
})
})

t.Run("PebbleStorage", func(t *testing.T) {
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
fn(t, pebbleimpl.ToDB(db))
})
})
}
23 changes: 23 additions & 0 deletions storage/operation/pebbleimpl/dbstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package pebbleimpl

import (
"github.com/cockroachdb/pebble"

"github.com/onflow/flow-go/storage"
)

func ToDB(db *pebble.DB) storage.DB {
return &dbStore{db: db}
}

type dbStore struct {
db *pebble.DB
}

func (b *dbStore) Reader() storage.Reader {
return dbReader{db: b.db}
}

func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error {
return WithReaderBatchWriter(b.db, fn)
}
163 changes: 163 additions & 0 deletions storage/operation/prefix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package operation

const (

// codes for special database markers
// codeMax = 1 // keeps track of the maximum key size
// nolint:unused
codeDBType = 2 // specifies a database type

// codes for views with special meaning
// nolint:unused
codeSafetyData = 10 // safety data for hotstuff state
// nolint:unused
codeLivenessData = 11 // liveness data for hotstuff state

// codes for fields associated with the root state
// nolint:unused
codeSporkID = 13
// nolint:unused
codeProtocolVersion = 14
// nolint:unused
codeEpochCommitSafetyThreshold = 15
// nolint:unused
codeSporkRootBlockHeight = 16

// code for heights with special meaning
// nolint:unused
codeFinalizedHeight = 20 // latest finalized block height
// nolint:unused
codeSealedHeight = 21 // latest sealed block height
// nolint:unused
codeClusterHeight = 22 // latest finalized height on cluster
// nolint:unused
codeExecutedBlock = 23 // latest executed block with max height
// nolint:unused
codeFinalizedRootHeight = 24 // the height of the highest finalized block contained in the root snapshot
// nolint:unused
codeLastCompleteBlockHeight = 25 // the height of the last block for which all collections were received
// nolint:unused
codeEpochFirstHeight = 26 // the height of the first block in a given epoch
// nolint:unused
codeSealedRootHeight = 27 // the height of the highest sealed block contained in the root snapshot

// codes for single entity storage
// nolint:unused
codeHeader = 30
_ = 31 // DEPRECATED: 31 was used for identities before epochs
codeGuarantee = 32
codeSeal = 33
codeTransaction = 34
codeCollection = 35
codeExecutionResult = 36
codeResultApproval = 37
codeChunk = 38
codeExecutionReceiptMeta = 39 // NOTE: prior to Mainnet25, this erroneously had the same value as codeExecutionResult (36)

// codes for indexing single identifier by identifier/integer
// nolint:unused
codeHeightToBlock = 40 // index mapping height to block ID
// nolint:unused
codeBlockIDToLatestSealID = 41 // index mapping a block its last payload seal
// nolint:unused
codeClusterBlockToRefBlock = 42 // index cluster block ID to reference block ID
// nolint:unused
codeRefHeightToClusterBlock = 43 // index reference block height to cluster block IDs
// nolint:unused
codeBlockIDToFinalizedSeal = 44 // index _finalized_ seal by sealed block ID
// nolint:unused
codeBlockIDToQuorumCertificate = 45 // index of quorum certificates by block ID
// nolint:unused
codeEpochProtocolStateByBlockID = 46 // index of epoch protocol state entry by block ID
// nolint:unused
codeProtocolKVStoreByBlockID = 47 // index of protocol KV store entry by block ID

// codes for indexing multiple identifiers by identifier
// nolint:unused
codeBlockChildren = 50 // index mapping block ID to children blocks
_ = 51 // DEPRECATED: 51 was used for identity indexes before epochs
// nolint:unused
codePayloadGuarantees = 52 // index mapping block ID to payload guarantees
// nolint:unused
codePayloadSeals = 53 // index mapping block ID to payload seals
// nolint:unused
codeCollectionBlock = 54 // index mapping collection ID to block ID
// nolint:unused
codeOwnBlockReceipt = 55 // index mapping block ID to execution receipt ID for execution nodes
_ = 56 // DEPRECATED: 56 was used for block->epoch status prior to Dynamic Protocol State in Mainnet25
// nolint:unused
codePayloadReceipts = 57 // index mapping block ID to payload receipts
// nolint:unused
codePayloadResults = 58 // index mapping block ID to payload results
// nolint:unused
codeAllBlockReceipts = 59 // index mapping of blockID to multiple receipts
// nolint:unused
codePayloadProtocolStateID = 60 // index mapping block ID to payload protocol state ID

// codes related to protocol level information
// nolint:unused
codeEpochSetup = 61 // EpochSetup service event, keyed by ID
// nolint:unused
codeEpochCommit = 62 // EpochCommit service event, keyed by ID
// nolint:unused
codeBeaconPrivateKey = 63 // BeaconPrivateKey, keyed by epoch counter
// nolint:unused
codeDKGStarted = 64 // flag that the DKG for an epoch has been started
// nolint:unused
codeDKGEnded = 65 // flag that the DKG for an epoch has ended (stores end state)
// nolint:unused
codeVersionBeacon = 67 // flag for storing version beacons
// nolint:unused
codeEpochProtocolState = 68
// nolint:unused
codeProtocolKVStore = 69

// code for ComputationResult upload status storage
// NOTE: for now only GCP uploader is supported. When other uploader (AWS e.g.) needs to
// be supported, we will need to define new code.
// nolint:unused
codeComputationResults = 66

// job queue consumers and producers
// nolint:unused
codeJobConsumerProcessed = 70
// nolint:unused
codeJobQueue = 71
// nolint:unused
codeJobQueuePointer = 72

// legacy codes (should be cleaned up)
codeChunkDataPack = 100
codeCommit = 101
codeEvent = 102
codeExecutionStateInteractions = 103
codeTransactionResult = 104
codeFinalizedCluster = 105
codeServiceEvent = 106
codeTransactionResultIndex = 107
codeLightTransactionResult = 108
codeLightTransactionResultIndex = 109
codeIndexCollection = 200
codeIndexExecutionResultByBlock = 202
codeIndexCollectionByTransaction = 203
codeIndexResultApprovalByChunk = 204

// TEMPORARY codes
// nolint:unused
blockedNodeIDs = 205 // manual override for adding node IDs to list of ejected nodes, applies to networking layer only

// internal failure information that should be preserved across restarts
// nolint:unused
codeExecutionFork = 254
// nolint:unused
codeEpochEmergencyFallbackTriggered = 255
)

func makePrefix(code byte, keys ...interface{}) []byte {
prefix := make([]byte, 1)
prefix[0] = code
for _, key := range keys {
prefix = append(prefix, EncodeKeyPart(key)...)
}
return prefix
}
20 changes: 20 additions & 0 deletions storage/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,26 @@ type ReaderBatchWriter interface {
AddCallback(func(error))
}

// DB is an interface for a database store that provides a reader and a writer.
type DB interface {
// Reader returns a database-backed reader which reads the latest
// committed global database state
Reader() Reader

// WithReaderBatchWriter creates a batch writer and allows the caller to perform
// atomic batch updates to the database.
// Any error returned are considered fatal and the batch is not committed.
WithReaderBatchWriter(func(ReaderBatchWriter) error) error
}

// OnlyWriter is an adapter to convert a function that takes a Writer
// to a function that takes a ReaderBatchWriter.
func OnlyWriter(fn func(Writer) error) func(ReaderBatchWriter) error {
return func(rw ReaderBatchWriter) error {
return fn(rw.Writer())
}
}

// OnCommitSucceed adds a callback to execute after the batch has been successfully committed.
func OnCommitSucceed(b ReaderBatchWriter, onSuccessFn func()) {
b.AddCallback(func(err error) {
Expand Down
Loading

0 comments on commit f297689

Please sign in to comment.