Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.33 - Add pebble-based storage layer files #6207

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cmd/util/cmd/common/storage_pebble.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package common

import (
"github.com/cockroachdb/pebble"

"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
storagepebble "github.com/onflow/flow-go/storage/pebble"
)

func InitStoragePebble(datadir string) (*pebble.DB, error) {
return pebble.Open(datadir, nil)
}

func InitStoragesPebble(db *pebble.DB) *storage.All {
metrics := &metrics.NoopCollector{}

return storagepebble.InitAll(metrics, db)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol/mock"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger/transaction"
"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -49,7 +50,7 @@ func TestFindBlockTransactions(t *testing.T) {

// prepare dependencies
storages := common.InitStorages(db)
payloads, collections := storages.Payloads, storages.Collections
blocks, payloads, collections := storages.Blocks, storages.Payloads, storages.Collections
snap4 := &mock.Snapshot{}
snap4.On("Head").Return(b1.Header, nil)
snap5 := &mock.Snapshot{}
Expand All @@ -59,8 +60,8 @@ func TestFindBlockTransactions(t *testing.T) {
state.On("AtHeight", uint64(5)).Return(snap5, nil)

// store into database
require.NoError(t, payloads.Store(b1.ID(), b1.Payload))
require.NoError(t, payloads.Store(b2.ID(), b2.Payload))
require.NoError(t, transaction.Update(db, blocks.StoreTx(&b1)))
require.NoError(t, transaction.Update(db, blocks.StoreTx(&b2)))

require.NoError(t, collections.Store(&col1.Collection))
require.NoError(t, collections.Store(&col2.Collection))
Expand Down
7 changes: 6 additions & 1 deletion cmd/util/cmd/update-commitment/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,15 @@ func updateCommitment(datadir, blockIDStr, commitStr string, force bool) error {

log.Info().Msgf("storing new commitment: %x", commit)

err = commits.Store(blockID, commit)
writeBatch = storagebadger.NewBatch(db)
err = commits.BatchStore(blockID, commit, writeBatch)
if err != nil {
return fmt.Errorf("could not store commit: %v", err)
}
err = writeBatch.Flush()
if err != nil {
return fmt.Errorf("could not flush write batch: %v", err)
}

log.Info().Msgf("commitment successfully stored for block %s", blockIDStr)

Expand Down
57 changes: 57 additions & 0 deletions consensus/hotstuff/persister/persister_pebble.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package persister

import (
"github.com/cockroachdb/pebble"

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage/pebble/operation"
)

// PersisterPebble can persist relevant information for hotstuff.
// PersisterPebble depends on protocol.State root snapshot bootstrapping to set initial values for
// SafetyData and LivenessData. These values must be initialized before first use of Persister.
type PersisterPebble struct {
db *pebble.DB
chainID flow.ChainID
}

var _ hotstuff.Persister = (*PersisterPebble)(nil)

// New creates a new Persister using the injected data base to persist
// relevant hotstuff data.
func NewPersisterPebble(db *pebble.DB, chainID flow.ChainID) *PersisterPebble {
p := &PersisterPebble{
db: db,
chainID: chainID,
}
return p
}

// GetSafetyData will retrieve last persisted safety data.
// During normal operations, no errors are expected.
func (p *PersisterPebble) GetSafetyData() (*hotstuff.SafetyData, error) {
var safetyData hotstuff.SafetyData
err := operation.RetrieveSafetyData(p.chainID, &safetyData)(p.db)
return &safetyData, err
}

// GetLivenessData will retrieve last persisted liveness data.
// During normal operations, no errors are expected.
func (p *PersisterPebble) GetLivenessData() (*hotstuff.LivenessData, error) {
var livenessData hotstuff.LivenessData
err := operation.RetrieveLivenessData(p.chainID, &livenessData)(p.db)
return &livenessData, err
}

// PutSafetyData persists the last safety data.
// During normal operations, no errors are expected.
func (p *PersisterPebble) PutSafetyData(safetyData *hotstuff.SafetyData) error {
return operation.UpdateSafetyData(p.chainID, safetyData)(p.db)
}

// PutLivenessData persists the last liveness data.
// During normal operations, no errors are expected.
func (p *PersisterPebble) PutLivenessData(livenessData *hotstuff.LivenessData) error {
return operation.UpdateLivenessData(p.chainID, livenessData)(p.db)
}
17 changes: 9 additions & 8 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/badger/transaction"
"github.com/onflow/flow-go/storage/util"
"github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest/mocks"
Expand Down Expand Up @@ -382,8 +383,8 @@ func (suite *Suite) TestGetBlockByIDAndHeight() {
block2 := unittest.BlockFixture()
block2.Header.Height = 2

require.NoError(suite.T(), all.Blocks.Store(&block1))
require.NoError(suite.T(), all.Blocks.Store(&block2))
require.NoError(suite.T(), transaction.Update(db, all.Blocks.StoreTx(&block1)))
require.NoError(suite.T(), transaction.Update(db, all.Blocks.StoreTx(&block2)))

// the follower logic should update height index on the block storage when a block is finalized
err := db.Update(operation.IndexBlockHeight(block2.Header.Height, block2.ID()))
Expand Down Expand Up @@ -678,7 +679,7 @@ func (suite *Suite) TestGetSealedTransaction() {
require.NoError(suite.T(), err)

// 1. Assume that follower engine updated the block storage and the protocol state. The block is reported as sealed
err = all.Blocks.Store(block)
err = transaction.Update(db, all.Blocks.StoreTx(block))
require.NoError(suite.T(), err)
suite.sealedBlock = block.Header

Expand Down Expand Up @@ -748,9 +749,9 @@ func (suite *Suite) TestGetTransactionResult() {
// specifically for this test we will consider that sealed block is far behind finalized, so we get EXECUTED status
suite.sealedSnapshot.On("Head").Return(sealedBlock, nil)

err := all.Blocks.Store(block)
err := transaction.Update(db, all.Blocks.StoreTx(block))
require.NoError(suite.T(), err)
err = all.Blocks.Store(blockNegative)
err = transaction.Update(db, all.Blocks.StoreTx(blockNegative))
require.NoError(suite.T(), err)

suite.state.On("AtBlockID", blockId).Return(suite.sealedSnapshot)
Expand Down Expand Up @@ -1051,7 +1052,7 @@ func (suite *Suite) TestExecuteScript() {

// create a block and a seal pointing to that block
lastBlock := unittest.BlockWithParentFixture(prevBlock.Header)
err = all.Blocks.Store(lastBlock)
err = transaction.Update(db, all.Blocks.StoreTx(lastBlock))
require.NoError(suite.T(), err)
err = db.Update(operation.IndexBlockHeight(lastBlock.Header.Height, lastBlock.ID()))
require.NoError(suite.T(), err)
Expand All @@ -1065,7 +1066,7 @@ func (suite *Suite) TestExecuteScript() {
require.NoError(suite.T(), err)
}

err = all.Blocks.Store(prevBlock)
err = transaction.Update(db, all.Blocks.StoreTx(prevBlock))
require.NoError(suite.T(), err)
err = db.Update(operation.IndexBlockHeight(prevBlock.Header.Height, prevBlock.ID()))
require.NoError(suite.T(), err)
Expand Down Expand Up @@ -1188,7 +1189,7 @@ func (suite *Suite) TestLastFinalizedBlockHeightResult() {
newFinalizedBlock := unittest.BlockWithParentFixture(block.Header)

// store new block
require.NoError(suite.T(), all.Blocks.Store(block))
require.NoError(suite.T(), transaction.Update(db, all.Blocks.StoreTx(block)))

assertFinalizedBlockHeader := func(resp *accessproto.BlockHeaderResponse, err error) {
require.NoError(suite.T(), err)
Expand Down
Loading
Loading