Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CI Build Version] Refactor Protocol State with Pebble-based Storage #6198

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions admin/commands/storage/read_range_cluster_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"fmt"

"github.com/dgraph-io/badger/v2"
"github.com/cockroachdb/pebble"
"github.com/rs/zerolog/log"

"github.com/onflow/flow-go/admin"
"github.com/onflow/flow-go/admin/commands"
"github.com/onflow/flow-go/cmd/util/cmd/read-light-block"
"github.com/onflow/flow-go/model/flow"
storage "github.com/onflow/flow-go/storage/badger"
storage "github.com/onflow/flow-go/storage/pebble"
)

var _ commands.AdminCommand = (*ReadRangeClusterBlocksCommand)(nil)
Expand All @@ -21,12 +21,12 @@ var _ commands.AdminCommand = (*ReadRangeClusterBlocksCommand)(nil)
const Max_Range_Cluster_Block_Limit = uint64(10001)

type ReadRangeClusterBlocksCommand struct {
db *badger.DB
db *pebble.DB
headers *storage.Headers
payloads *storage.ClusterPayloads
}

func NewReadRangeClusterBlocksCommand(db *badger.DB, headers *storage.Headers, payloads *storage.ClusterPayloads) commands.AdminCommand {
func NewReadRangeClusterBlocksCommand(db *pebble.DB, headers *storage.Headers, payloads *storage.ClusterPayloads) commands.AdminCommand {
return &ReadRangeClusterBlocksCommand{
db: db,
headers: headers,
Expand Down
32 changes: 16 additions & 16 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ import (
"github.com/onflow/flow-go/network/underlay"
"github.com/onflow/flow-go/network/validator"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
pebbleState "github.com/onflow/flow-go/state/protocol/pebble"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
pStorage "github.com/onflow/flow-go/storage/pebble"
pstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/grpcutils"
)

Expand Down Expand Up @@ -250,7 +250,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
scriptExecutorConfig: query.NewDefaultConfig(),
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pStorage.CacheTypeTwoQueue.String(),
registerCacheType: pstorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
}
Expand Down Expand Up @@ -320,12 +320,12 @@ func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilde
builder.Module("mutable follower state", func(node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// If we ever support different implementations, the following can be replaced by a type-aware factory
state, ok := node.State.(*badgerState.State)
state, ok := node.State.(*pebbleState.State)
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
}

followerState, err := badgerState.NewFollowerState(
followerState, err := pebbleState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
Expand Down Expand Up @@ -383,7 +383,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
builder.Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// create a finalizer that will handle updating the protocol
// state when the follower detects newly finalized blocks
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)
final := finalizer.NewFinalizerPebble(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)

packer := signature.NewConsensusSigDataPacker(builder.Committee)
// initialize the verifier for the protocol consensus
Expand Down Expand Up @@ -725,27 +725,27 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}).
Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = pstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
builder.Storage.LightTransactionResults = pstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, pstorage.DefaultCacheSize)
return nil
}).
DependableComponent("execution data indexer", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// Note: using a DependableComponent here to ensure that the indexer does not block
// other components from starting while bootstrapping the register db since it may
// take hours to complete.

pdb, err := pStorage.OpenRegisterPebbleDB(builder.registersDBPath)
pdb, err := pstorage.OpenRegisterPebbleDB(builder.registersDBPath)
if err != nil {
return nil, fmt.Errorf("could not open registers db: %w", err)
}
builder.ShutdownFunc(func() error {
return pdb.Close()
})

bootstrapped, err := pStorage.IsBootstrapped(pdb)
bootstrapped, err := pstorage.IsBootstrapped(pdb)
if err != nil {
return nil, fmt.Errorf("could not check if registers db is bootstrapped: %w", err)
}
Expand Down Expand Up @@ -777,7 +777,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

rootHash := ledger.RootHash(builder.RootSeal.FinalState)
bootstrap, err := pStorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger)
bootstrap, err := pstorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger)
if err != nil {
return nil, fmt.Errorf("could not create registers bootstrap: %w", err)
}
Expand All @@ -790,18 +790,18 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}
}

registers, err := pStorage.NewRegisters(pdb)
registers, err := pstorage.NewRegisters(pdb)
if err != nil {
return nil, fmt.Errorf("could not create registers storage: %w", err)
}

if builder.registerCacheSize > 0 {
cacheType, err := pStorage.ParseCacheType(builder.registerCacheType)
cacheType, err := pstorage.ParseCacheType(builder.registerCacheType)
if err != nil {
return nil, fmt.Errorf("could not parse register cache type: %w", err)
}
cacheMetrics := metrics.NewCacheCollector(builder.RootChainID)
registersCache, err := pStorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
registersCache, err := pstorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
if err != nil {
return nil, fmt.Errorf("could not create registers cache: %w", err)
}
Expand Down Expand Up @@ -1406,7 +1406,7 @@ func (builder *FlowAccessNodeBuilder) Initialize() error {

builder.EnqueueTracer()
builder.PreInit(cmd.DynamicStartPreInit)
builder.ValidateRootSnapshot(badgerState.ValidRootSnapshotContainsEntityExpiryRange)
builder.ValidateRootSnapshot(pebbleState.ValidRootSnapshotContainsEntityExpiryRange)

return nil
}
Expand Down Expand Up @@ -1596,7 +1596,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
builder.Storage.Events = pstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Module("events index", func(node *cmd.NodeConfig) error {
Expand Down
20 changes: 10 additions & 10 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ import (
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/state/protocol/events/gadgets"
"github.com/onflow/flow-go/storage/badger"
pebbleState "github.com/onflow/flow-go/state/protocol/pebble"
"github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/grpcutils"
)

Expand Down Expand Up @@ -182,10 +182,10 @@ func main() {
nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
AdminCommand("read-range-cluster-blocks", func(conf *cmd.NodeConfig) commands.AdminCommand {
clusterPayloads := badger.NewClusterPayloads(&metrics.NoopCollector{}, conf.DB)
headers, ok := conf.Storage.Headers.(*badger.Headers)
clusterPayloads := pebble.NewClusterPayloads(&metrics.NoopCollector{}, conf.DB)
headers, ok := conf.Storage.Headers.(*pebble.Headers)
if !ok {
panic("fail to initialize admin tool, conf.Storage.Headers can not be casted as badger headers")
panic("fail to initialize admin tool, conf.Storage.Headers can not be casted as pebble headers")
}
return storageCommands.NewReadRangeClusterBlocksCommand(conf.DB, headers, clusterPayloads)
}).
Expand All @@ -195,13 +195,13 @@ func main() {
return nil
}).
Module("mutable follower state", func(node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// For now, we only support state implementations from package pebble.
// If we ever support different implementations, the following can be replaced by a type-aware factory
state, ok := node.State.(*badgerState.State)
state, ok := node.State.(*pebbleState.State)
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
return fmt.Errorf("only implementations of type pebble.State are currently supported but read-only state has type %T", node.State)
}
followerState, err = badgerState.NewFollowerState(
followerState, err = pebbleState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
Expand Down Expand Up @@ -287,7 +287,7 @@ func main() {
Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// create a finalizer for updating the protocol
// state when the follower detects newly finalized blocks
finalizer := confinalizer.NewFinalizer(node.DB, node.Storage.Headers, followerState, node.Tracer)
finalizer := confinalizer.NewFinalizerPebble(node.DB, node.Storage.Headers, followerState, node.Tracer)
finalized, pending, err := recovery.FindLatest(node.State, node.Storage.Headers)
if err != nil {
return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err)
Expand Down
22 changes: 11 additions & 11 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ import (
"github.com/onflow/flow-go/module/validation"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/state/protocol/events/gadgets"
pebbleState "github.com/onflow/flow-go/state/protocol/pebble"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
bstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/io"
)

Expand Down Expand Up @@ -209,7 +209,7 @@ func main() {

nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
ValidateRootSnapshot(badgerState.ValidRootSnapshotContainsEntityExpiryRange).
ValidateRootSnapshot(pebbleState.ValidRootSnapshotContainsEntityExpiryRange).
Module("consensus node metrics", func(node *cmd.NodeConfig) error {
conMetrics = metrics.NewConsensusCollector(node.Tracer, node.MetricsRegisterer)
return nil
Expand Down Expand Up @@ -244,11 +244,11 @@ func main() {
return err
}).
Module("mutable follower state", func(node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// For now, we only support state implementations from package pebble.
// If we ever support different implementations, the following can be replaced by a type-aware factory
state, ok := node.State.(*badgerState.State)
state, ok := node.State.(*pebbleState.State)
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
return fmt.Errorf("only implementations of type pebble.State are currently supported but read-only state has type %T", node.State)
}

chunkAssigner, err = chmodule.NewChunkAssigner(chunkAlpha, node.State)
Expand Down Expand Up @@ -278,7 +278,7 @@ func main() {
return err
}

mutableState, err = badgerState.NewFullConsensusState(
mutableState, err = pebbleState.NewFullConsensusState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
Expand Down Expand Up @@ -559,12 +559,12 @@ func main() {
}).
Component("hotstuff modules", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize the block finalizer
finalize := finalizer.NewFinalizer(
finalize := finalizer.NewFinalizerPebble(
node.DB,
node.Storage.Headers,
mutableState,
node.Tracer,
finalizer.WithCleanup(finalizer.CleanupMempools(
finalizer.WithCleanupPebble(finalizer.CleanupMempools(
node.Metrics.Mempool,
conMetrics,
node.Storage.Payloads,
Expand Down Expand Up @@ -605,7 +605,7 @@ func main() {
notifier.AddFollowerConsumer(followerDistributor)

// initialize the persister
persist := persister.New(node.DB, node.RootChainID)
persist := persister.NewPersisterPebble(node.DB, node.RootChainID)

finalizedBlock, err := node.State.Final().Head()
if err != nil {
Expand Down Expand Up @@ -722,7 +722,7 @@ func main() {
Component("consensus participant", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize the block builder
var build module.Builder
build, err = builder.NewBuilder(
build, err = builder.NewBuilderPebble(
node.Metrics.Mempool,
node.DB,
mutableState,
Expand Down
4 changes: 2 additions & 2 deletions cmd/dynamic_startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/state/protocol"
badgerstate "github.com/onflow/flow-go/state/protocol/badger"
pebblestate "github.com/onflow/flow-go/state/protocol/pebble"
utilsio "github.com/onflow/flow-go/utils/io"

"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -152,7 +152,7 @@ func DynamicStartPreInit(nodeConfig *NodeConfig) error {
log := nodeConfig.Logger.With().Str("component", "dynamic-startup").Logger()

// skip dynamic startup if the protocol state is bootstrapped
isBootstrapped, err := badgerstate.IsBootstrapped(nodeConfig.DB)
isBootstrapped, err := pebblestate.IsBootstrapped(nodeConfig.DB)
if err != nil {
return fmt.Errorf("could not check if state is boostrapped: %w", err)
}
Expand Down
Loading
Loading