From 789d2bb0cfb0b4e452cbe355c0de50909f1a75f7 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 15 Aug 2024 16:30:42 -0700 Subject: [PATCH 1/3] refactor indexing new block --- cmd/collection/main.go | 2 + cmd/consensus/main.go | 4 ++ cmd/execution_builder.go | 1 + cmd/observer/node_builder/observer_builder.go | 2 + .../read-light-block/read_light_block_test.go | 3 +- cmd/verification_builder.go | 2 + consensus/integration/nodes_test.go | 5 +- .../collection/epochmgr/factories/builder.go | 2 + .../epochmgr/factories/cluster_state.go | 5 +- engine/collection/epochmgr/factories/epoch.go | 8 ++- engine/testutil/nodes.go | 3 + module/builder/collection/builder_pebble.go | 6 +- .../builder/collection/builder_pebble_test.go | 45 +++++++------ module/builder/consensus/builder_pebble.go | 1 + .../builder/consensus/builder_pebble_test.go | 5 ++ .../collection/finalizer_pebble_test.go | 4 +- state/cluster/pebble/mutator.go | 20 +++--- state/cluster/pebble/mutator_test.go | 4 +- state/cluster/pebble/snapshot_test.go | 9 ++- state/cluster/pebble/state.go | 3 +- state/protocol/pebble/mutator.go | 34 +++++----- state/protocol/pebble/mutator_test.go | 5 ++ state/protocol/util/testing_pebble.go | 15 +++-- storage/blocks.go | 7 ++ storage/cluster_blocks.go | 6 ++ storage/pebble/cluster_blocks_test.go | 3 +- storage/pebble/procedure/children.go | 27 +++++++- storage/pebble/procedure/children_test.go | 19 ++++-- storage/pebble/procedure/cluster.go | 17 ++++- storage/pebble/procedure/cluster_test.go | 67 +++++++++++++++++++ 30 files changed, 255 insertions(+), 79 deletions(-) create mode 100644 storage/pebble/procedure/cluster_test.go diff --git a/cmd/collection/main.go b/cmd/collection/main.go index d0b43d4626e..83cfe10e936 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -52,6 +52,7 @@ import ( "github.com/onflow/flow-go/state/protocol/events/gadgets" pebbleState "github.com/onflow/flow-go/state/protocol/pebble" "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/utils/grpcutils" ) @@ -208,6 +209,7 @@ func main() { state, node.Storage.Index, node.Storage.Payloads, + procedure.NewBlockIndexer(), blocktimer.DefaultBlockTimer, ) return err diff --git a/cmd/consensus/main.go b/cmd/consensus/main.go index e853f007f24..8eb79420d4e 100644 --- a/cmd/consensus/main.go +++ b/cmd/consensus/main.go @@ -67,6 +67,7 @@ import ( pebbleState "github.com/onflow/flow-go/state/protocol/pebble" "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/utils/io" ) @@ -132,6 +133,7 @@ func main() { getSealingConfigs module.SealingConfigsGetter ) var deprecatedFlagBlockRateDelay time.Duration + blockIndexer := procedure.NewBlockIndexer() nodeBuilder := cmd.FlowNode(flow.RoleConsensus.String()) nodeBuilder.ExtraFlags(func(flags *pflag.FlagSet) { @@ -285,6 +287,7 @@ func main() { state, node.Storage.Index, node.Storage.Payloads, + blockIndexer, blockTimer, receiptValidator, sealValidator, @@ -730,6 +733,7 @@ func main() { node.Storage.Seals, node.Storage.Index, node.Storage.Blocks, + blockIndexer, node.Storage.Results, node.Storage.Receipts, guarantees, diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index dfab5d5c68d..6e7b00e3dba 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -255,6 +255,7 @@ func (exeNode *ExecutionNode) LoadMutableFollowerState(node *NodeConfig) error { bState, node.Storage.Index, node.Storage.Payloads, + procedure.NewBlockIndexer(), blocktimer.DefaultBlockTimer, ) return err diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 0227a27a4aa..f2b44d03c3e 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -102,6 +102,7 @@ import ( "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/badger" pstorage "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/utils/grpcutils" "github.com/onflow/flow-go/utils/io" ) @@ -352,6 +353,7 @@ func (builder *ObserverServiceBuilder) buildFollowerState() *ObserverServiceBuil state, node.Storage.Index, node.Storage.Payloads, + procedure.NewBlockIndexer(), blocktimer.DefaultBlockTimer, ) builder.FollowerState = followerState diff --git a/cmd/util/cmd/read-light-block/read_light_block_test.go b/cmd/util/cmd/read-light-block/read_light_block_test.go index 5d82f328e28..f82aa1c6820 100644 --- a/cmd/util/cmd/read-light-block/read_light_block_test.go +++ b/cmd/util/cmd/read-light-block/read_light_block_test.go @@ -26,9 +26,10 @@ func TestReadClusterRange(t *testing.T) { err = operation.InsertClusterFinalizedHeight(parent.Header.ChainID, parent.Header.Height)(db) require.NoError(t, err) + blockIndexer := procedure.NewClusterBlockIndexer() // add blocks for _, block := range blocks { - err := operation.WithReaderBatchWriter(db, procedure.InsertClusterBlock(&block)) + err := operation.WithReaderBatchWriter(db, blockIndexer.InsertClusterBlock(&block)) require.NoError(t, err) err = operation.WithReaderBatchWriter(db, procedure.FinalizeClusterBlock(block.Header.ID())) diff --git a/cmd/verification_builder.go b/cmd/verification_builder.go index 6f7bd4ee397..f56b93c7690 100644 --- a/cmd/verification_builder.go +++ b/cmd/verification_builder.go @@ -37,6 +37,7 @@ import ( "github.com/onflow/flow-go/state/protocol/blocktimer" pebbleState "github.com/onflow/flow-go/state/protocol/pebble" "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/pebble/procedure" ) type VerificationConfig struct { @@ -125,6 +126,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { state, node.Storage.Index, node.Storage.Payloads, + procedure.NewBlockIndexer(), blocktimer.DefaultBlockTimer, ) return err diff --git a/consensus/integration/nodes_test.go b/consensus/integration/nodes_test.go index 85a0ef655cd..b575c2d00ff 100644 --- a/consensus/integration/nodes_test.go +++ b/consensus/integration/nodes_test.go @@ -58,6 +58,7 @@ import ( "github.com/onflow/flow-go/state/protocol/util" storagemock "github.com/onflow/flow-go/storage/mock" storage "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/utils/unittest" ) @@ -403,6 +404,7 @@ func createNode( blockTimer, err := blocktimer.NewBlockTimer(1*time.Millisecond, 90*time.Second) require.NoError(t, err) + blockIndexer := procedure.NewBlockIndexer() fullState, err := bprotocol.NewFullConsensusState( log, @@ -411,6 +413,7 @@ func createNode( state, indexDB, payloadsDB, + blockIndexer, blockTimer, util.MockReceiptValidator(), util.MockSealValidator(sealsDB), @@ -458,7 +461,7 @@ func createNode( seals := stdmap.NewIncorporatedResultSeals(sealLimit) // initialize the block builder - build, err := builder.NewBuilderPebble(metricsCollector, db, fullState, headersDB, sealsDB, indexDB, blocksDB, resultsDB, receiptsDB, + build, err := builder.NewBuilderPebble(metricsCollector, db, fullState, headersDB, sealsDB, indexDB, blocksDB, blockIndexer, resultsDB, receiptsDB, guarantees, consensusMempools.NewIncorporatedResultSeals(seals, receiptsDB), receipts, tracer) require.NoError(t, err) diff --git a/engine/collection/epochmgr/factories/builder.go b/engine/collection/epochmgr/factories/builder.go index 77619fbb9bc..12cc2afda79 100644 --- a/engine/collection/epochmgr/factories/builder.go +++ b/engine/collection/epochmgr/factories/builder.go @@ -55,6 +55,7 @@ func (f *BuilderFactory) Create( clusterState clusterstate.State, clusterHeaders storage.Headers, clusterPayloads storage.ClusterPayloads, + blockIndexer storage.ClusterBlockIndexer, pool mempool.Transactions, epoch uint64, ) (module.Builder, *finalizer.FinalizerPebble, error) { @@ -67,6 +68,7 @@ func (f *BuilderFactory) Create( f.mainChainHeaders, clusterHeaders, clusterPayloads, + blockIndexer, pool, f.log, epoch, diff --git a/engine/collection/epochmgr/factories/cluster_state.go b/engine/collection/epochmgr/factories/cluster_state.go index 4380bc8c0c9..d346e8b16dc 100644 --- a/engine/collection/epochmgr/factories/cluster_state.go +++ b/engine/collection/epochmgr/factories/cluster_state.go @@ -7,6 +7,7 @@ import ( "github.com/onflow/flow-go/module" clusterkv "github.com/onflow/flow-go/state/cluster/pebble" + "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/pebble" ) @@ -29,7 +30,7 @@ func NewClusterStateFactory( return factory, nil } -func (f *ClusterStateFactory) Create(stateRoot *clusterkv.StateRoot) ( +func (f *ClusterStateFactory) Create(stateRoot *clusterkv.StateRoot, blockIndexer storage.ClusterBlockIndexer) ( *clusterkv.MutableState, *bstorage.Headers, *bstorage.ClusterPayloads, @@ -58,7 +59,7 @@ func (f *ClusterStateFactory) Create(stateRoot *clusterkv.StateRoot) ( } } - mutableState, err := clusterkv.NewMutableState(clusterState, f.tracer, headers, payloads) + mutableState, err := clusterkv.NewMutableState(clusterState, f.tracer, headers, payloads, blockIndexer) if err != nil { return nil, nil, nil, nil, fmt.Errorf("could create mutable cluster state: %w", err) } diff --git a/engine/collection/epochmgr/factories/epoch.go b/engine/collection/epochmgr/factories/epoch.go index cdc684eb3b6..c4fc9910c3b 100644 --- a/engine/collection/epochmgr/factories/epoch.go +++ b/engine/collection/epochmgr/factories/epoch.go @@ -12,6 +12,7 @@ import ( "github.com/onflow/flow-go/state/cluster/pebble" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/pebble/procedure" ) type EpochComponentsFactory struct { @@ -114,8 +115,11 @@ func (factory *EpochComponentsFactory) Create( err = fmt.Errorf("could not create valid state root: %w", err) return } + + blockIndexer := procedure.NewClusterBlockIndexer() + var mutableState *pebble.MutableState - mutableState, headers, payloads, blocks, err = factory.state.Create(stateRoot) + mutableState, headers, payloads, blocks, err = factory.state.Create(stateRoot, blockIndexer) state = mutableState if err != nil { err = fmt.Errorf("could not create cluster state: %w", err) @@ -125,7 +129,7 @@ func (factory *EpochComponentsFactory) Create( // get the transaction pool for the epoch pool := factory.pools.ForEpoch(epochCounter) - builder, finalizer, err := factory.builder.Create(state, headers, payloads, pool, epochCounter) + builder, finalizer, err := factory.builder.Create(state, headers, payloads, blockIndexer, pool, epochCounter) if err != nil { err = fmt.Errorf("could not create builder/finalizer: %w", err) return diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index d6860b65cf1..b859e5f8fc8 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -107,6 +107,7 @@ import ( "github.com/onflow/flow-go/state/protocol/util" storage "github.com/onflow/flow-go/storage/pebble" storagepebble "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/utils/unittest" ) @@ -271,6 +272,7 @@ func CompleteStateFixture( state, s.Index, s.Payloads, + procedure.NewBlockIndexer(), util.MockBlockTimer(), util.MockReceiptValidator(), util.MockSealValidator(s.Seals), @@ -586,6 +588,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit protoState.State, node.Index, node.Payloads, + procedure.NewBlockIndexer(), blocktimer.DefaultBlockTimer, ) require.NoError(t, err) diff --git a/module/builder/collection/builder_pebble.go b/module/builder/collection/builder_pebble.go index 72d814fb8dc..f14a0026e79 100644 --- a/module/builder/collection/builder_pebble.go +++ b/module/builder/collection/builder_pebble.go @@ -20,7 +20,6 @@ import ( "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/pebble/operation" - "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/utils/logging" ) @@ -37,6 +36,7 @@ type BuilderPebble struct { protoState protocol.State clusterState clusterstate.State payloads storage.ClusterPayloads + blockIndexer storage.ClusterBlockIndexer transactions mempool.Transactions tracer module.Tracer config Config @@ -56,6 +56,7 @@ func NewBuilderPebble( mainHeaders storage.Headers, clusterHeaders storage.Headers, payloads storage.ClusterPayloads, + blockIndexer storage.ClusterBlockIndexer, transactions mempool.Transactions, log zerolog.Logger, epochCounter uint64, @@ -69,6 +70,7 @@ func NewBuilderPebble( mainHeaders: mainHeaders, clusterHeaders: clusterHeaders, payloads: payloads, + blockIndexer: blockIndexer, transactions: transactions, config: DefaultConfig(), log: log.With().Str("component", "cluster_builder").Logger(), @@ -193,7 +195,7 @@ func (b *BuilderPebble) BuildOn(parentID flow.Identifier, setter func(*flow.Head // STEP 4: insert the cluster block to the database. span, _ = b.tracer.StartSpanFromContext(ctx, trace.COLBuildOnDBInsert) - err = operation.WithReaderBatchWriter(b.db, procedure.InsertClusterBlock(&proposal)) + err = operation.WithReaderBatchWriter(b.db, b.blockIndexer.InsertClusterBlock(&proposal)) span.End() if err != nil { return nil, fmt.Errorf("could not insert built block: %w", err) diff --git a/module/builder/collection/builder_pebble_test.go b/module/builder/collection/builder_pebble_test.go index bcca8390d5a..56e2a39f066 100644 --- a/module/builder/collection/builder_pebble_test.go +++ b/module/builder/collection/builder_pebble_test.go @@ -43,9 +43,10 @@ type BuilderPebbleSuite struct { chainID flow.ChainID epochCounter uint64 - headers storage.Headers - payloads storage.ClusterPayloads - blocks storage.Blocks + headers storage.Headers + payloads storage.ClusterPayloads + blocks storage.Blocks + blockIndexer storage.ClusterBlockIndexer state cluster.MutableState @@ -77,6 +78,7 @@ func (suite *BuilderPebbleSuite) SetupTest() { suite.headers = all.Headers suite.blocks = all.Blocks suite.payloads = bstorage.NewClusterPayloads(metrics, suite.db) + suite.blockIndexer = procedure.NewClusterBlockIndexer() // just bootstrap with a genesis block, we'll use this as reference root, result, seal := unittest.BootstrapFixture(unittest.IdentityListFixture(5, unittest.WithAllRoles())) @@ -93,7 +95,7 @@ func (suite *BuilderPebbleSuite) SetupTest() { clusterState, err := clusterkv.Bootstrap(suite.db, clusterStateRoot) suite.Require().NoError(err) - suite.state, err = clusterkv.NewMutableState(clusterState, tracer, suite.headers, suite.payloads) + suite.state, err = clusterkv.NewMutableState(clusterState, tracer, suite.headers, suite.payloads, suite.blockIndexer) suite.Require().NoError(err) state, err := ppebble.Bootstrap( @@ -119,6 +121,7 @@ func (suite *BuilderPebbleSuite) SetupTest() { state, all.Index, all.Payloads, + procedure.NewBlockIndexer(), util.MockBlockTimer(), ) require.NoError(suite.T(), err) @@ -134,7 +137,7 @@ func (suite *BuilderPebbleSuite) SetupTest() { suite.Assert().True(added) } - suite.builder, _ = builder.NewBuilderPebble(suite.db, tracer, suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter) + suite.builder, _ = builder.NewBuilderPebble(suite.db, tracer, suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter) } // runs after each test finishes @@ -146,7 +149,7 @@ func (suite *BuilderPebbleSuite) TearDownTest() { } func (suite *BuilderPebbleSuite) InsertBlock(block model.Block) { - err := operation.WithReaderBatchWriter(suite.db, procedure.InsertClusterBlock(&block)) + err := operation.WithReaderBatchWriter(suite.db, suite.blockIndexer.InsertClusterBlock(&block)) suite.Assert().NoError(err) } @@ -489,7 +492,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_LargeHistory() { // use a mempool with 2000 transactions, one per block suite.pool = herocache.NewTransactions(2000, unittest.Logger(), metrics.NewNoopCollector()) - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionSize(10000)) + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionSize(10000)) // get a valid reference block ID final, err := suite.protoState.Final().Head() @@ -569,7 +572,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_LargeHistory() { func (suite *BuilderPebbleSuite) TestBuildOn_MaxCollectionSize() { // set the max collection size to 1 - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionSize(1)) + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionSize(1)) // build a block header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) @@ -587,7 +590,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_MaxCollectionSize() { func (suite *BuilderPebbleSuite) TestBuildOn_MaxCollectionByteSize() { // set the max collection byte size to 400 (each tx is about 150 bytes) - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionByteSize(400)) + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionByteSize(400)) // build a block header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) @@ -605,7 +608,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_MaxCollectionByteSize() { func (suite *BuilderPebbleSuite) TestBuildOn_MaxCollectionTotalGas() { // set the max gas to 20,000 - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionTotalGas(20000)) + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionTotalGas(20000)) // build a block header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) @@ -642,7 +645,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_ExpiredTransaction() { // reset the pool and builder suite.pool = herocache.NewTransactions(10, unittest.Logger(), metrics.NewNoopCollector()) - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter) + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter) // insert a transaction referring genesis (now expired) tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { @@ -684,7 +687,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_EmptyMempool() { // start with an empty mempool suite.pool = herocache.NewTransactions(1000, unittest.Logger(), metrics.NewNoopCollector()) - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter) + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter) header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) suite.Require().NoError(err) @@ -711,7 +714,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_NoRateLimiting() { suite.ClearPool() // create builder with no rate limit and max 10 tx/collection - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter, + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(0), ) @@ -752,7 +755,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_RateLimitNonPayer() { suite.ClearPool() // create builder with 5 tx/payer and max 10 tx/collection - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter, + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(5), ) @@ -796,7 +799,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_HighRateLimit() { suite.ClearPool() // create builder with 5 tx/payer and max 10 tx/collection - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter, + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(5), ) @@ -834,7 +837,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_LowRateLimit() { suite.ClearPool() // create builder with .5 tx/payer and max 10 tx/collection - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter, + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(.5), ) @@ -876,7 +879,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_UnlimitedPayer() { // create builder with 5 tx/payer and max 10 tx/collection // configure an unlimited payer payer := unittest.RandomAddressFixture() - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter, + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(5), builder.WithUnlimitedPayers(payer), @@ -917,7 +920,7 @@ func (suite *BuilderPebbleSuite) TestBuildOn_RateLimitDryRun() { // create builder with 5 tx/payer and max 10 tx/collection // configure an unlimited payer payer := unittest.RandomAddressFixture() - suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter, + suite.builder, _ = builder.NewBuilderPebble(suite.db, trace.NewNoopTracer(), suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(5), builder.WithRateLimitDryRun(true), @@ -993,7 +996,7 @@ func benchmarkBuildOnPebble(b *testing.B, size int) { state, err := clusterkv.Bootstrap(suite.db, stateRoot) assert.NoError(b, err) - suite.state, err = clusterkv.NewMutableState(state, tracer, suite.headers, suite.payloads) + suite.state, err = clusterkv.NewMutableState(state, tracer, suite.headers, suite.payloads, suite.blockIndexer) assert.NoError(b, err) // add some transactions to transaction pool @@ -1004,14 +1007,14 @@ func benchmarkBuildOnPebble(b *testing.B, size int) { } // create the builder - suite.builder, _ = builder.NewBuilderPebble(suite.db, tracer, suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.pool, unittest.Logger(), suite.epochCounter) + suite.builder, _ = builder.NewBuilderPebble(suite.db, tracer, suite.protoState, suite.state, suite.headers, suite.headers, suite.payloads, suite.blockIndexer, suite.pool, unittest.Logger(), suite.epochCounter) } // create a block history to test performance against final := suite.genesis for i := 0; i < size; i++ { block := unittest.ClusterBlockWithParent(final) - err := operation.WithReaderBatchWriter(suite.db, procedure.InsertClusterBlock(&block)) + err := operation.WithReaderBatchWriter(suite.db, suite.blockIndexer.InsertClusterBlock(&block)) require.NoError(b, err) // finalize the block 80% of the time, resulting in a fork-rate of 20% diff --git a/module/builder/consensus/builder_pebble.go b/module/builder/consensus/builder_pebble.go index 925c567c2f9..da4e289bebb 100644 --- a/module/builder/consensus/builder_pebble.go +++ b/module/builder/consensus/builder_pebble.go @@ -49,6 +49,7 @@ func NewBuilderPebble( seals storage.Seals, index storage.Index, blocks storage.Blocks, + blockIndex storage.BlockIndexer, resultsDB storage.ExecutionResults, receiptsDB storage.ExecutionReceipts, guarPool mempool.Guarantees, diff --git a/module/builder/consensus/builder_pebble_test.go b/module/builder/consensus/builder_pebble_test.go index 27896de32e5..10e812e6feb 100644 --- a/module/builder/consensus/builder_pebble_test.go +++ b/module/builder/consensus/builder_pebble_test.go @@ -22,6 +22,7 @@ import ( storerr "github.com/onflow/flow-go/storage" storage "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/storage/pebble/operation" + "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/utils/unittest" ) @@ -58,6 +59,7 @@ type BuilderSuitePebble struct { index map[flow.Identifier]*flow.Index blocks map[flow.Identifier]*flow.Block blockChildren map[flow.Identifier][]flow.Identifier // ids of children blocks + blockIndexer storerr.BlockIndexer lastSeal *flow.Seal @@ -206,6 +208,7 @@ func (bs *BuilderSuitePebble) SetupTest() { bs.index = make(map[flow.Identifier]*flow.Index) bs.blocks = make(map[flow.Identifier]*flow.Block) bs.blockChildren = make(map[flow.Identifier][]flow.Identifier) + bs.blockIndexer = procedure.NewBlockIndexer() // initialize behaviour tracking bs.assembled = nil @@ -418,6 +421,7 @@ func (bs *BuilderSuitePebble) SetupTest() { bs.sealDB, bs.indexDB, bs.blockDB, + bs.blockIndexer, bs.resultDB, bs.receiptsDB, bs.guarPool, @@ -1420,6 +1424,7 @@ func (bs *BuilderSuitePebble) TestIntegration_RepopulateExecutionTreeAtStartup() bs.sealDB, bs.indexDB, bs.blockDB, + bs.blockIndexer, bs.resultDB, bs.receiptsDB, bs.guarPool, diff --git a/module/finalizer/collection/finalizer_pebble_test.go b/module/finalizer/collection/finalizer_pebble_test.go index d9472aeea58..e2801181e0b 100644 --- a/module/finalizer/collection/finalizer_pebble_test.go +++ b/module/finalizer/collection/finalizer_pebble_test.go @@ -55,9 +55,11 @@ func TestFinalizerPebble(t *testing.T) { require.NoError(t, err) } + blockIndexer := procedure.NewClusterBlockIndexer() + // a helper function to insert a block insert := func(block model.Block) { - err := operation.WithReaderBatchWriter(db, procedure.InsertClusterBlock(&block)) + err := operation.WithReaderBatchWriter(db, blockIndexer.InsertClusterBlock(&block)) assert.Nil(t, err) } diff --git a/state/cluster/pebble/mutator.go b/state/cluster/pebble/mutator.go index 11b768dd946..2cd667ac3b8 100644 --- a/state/cluster/pebble/mutator.go +++ b/state/cluster/pebble/mutator.go @@ -23,19 +23,21 @@ import ( type MutableState struct { *State - tracer module.Tracer - headers storage.Headers - payloads storage.ClusterPayloads + tracer module.Tracer + headers storage.Headers + payloads storage.ClusterPayloads + blockIndexer storage.ClusterBlockIndexer } var _ clusterstate.MutableState = (*MutableState)(nil) -func NewMutableState(state *State, tracer module.Tracer, headers storage.Headers, payloads storage.ClusterPayloads) (*MutableState, error) { +func NewMutableState(state *State, tracer module.Tracer, headers storage.Headers, payloads storage.ClusterPayloads, clusterBlockIndexer storage.ClusterBlockIndexer) (*MutableState, error) { mutableState := &MutableState{ - State: state, - tracer: tracer, - headers: headers, - payloads: payloads, + State: state, + tracer: tracer, + headers: headers, + payloads: payloads, + blockIndexer: clusterBlockIndexer, } return mutableState, nil } @@ -138,7 +140,7 @@ func (m *MutableState) Extend(candidate *cluster.Block) error { } span, _ = m.tracer.StartSpanFromContext(ctx, trace.COLClusterStateMutatorExtendDBInsert) - err = operation.WithReaderBatchWriter(m.State.db, procedure.InsertClusterBlock(candidate)) + err = operation.WithReaderBatchWriter(m.State.db, m.blockIndexer.InsertClusterBlock(candidate)) span.End() if err != nil { return fmt.Errorf("could not insert cluster block: %w", err) diff --git a/state/cluster/pebble/mutator_test.go b/state/cluster/pebble/mutator_test.go index e159ba0f5a7..bf93ce66447 100644 --- a/state/cluster/pebble/mutator_test.go +++ b/state/cluster/pebble/mutator_test.go @@ -91,14 +91,14 @@ func (suite *MutatorSuite) SetupTest() { rootSnapshot, ) require.NoError(suite.T(), err) - suite.protoState, err = ppebble.NewFollowerState(log, tracer, events.NewNoop(), state, all.Index, all.Payloads, protocolutil.MockBlockTimer()) + suite.protoState, err = ppebble.NewFollowerState(log, tracer, events.NewNoop(), state, all.Index, all.Payloads, procedure.NewBlockIndexer(), protocolutil.MockBlockTimer()) require.NoError(suite.T(), err) clusterStateRoot, err := NewStateRoot(suite.genesis, unittest.QuorumCertificateFixture(), suite.epochCounter) suite.NoError(err) clusterState, err := Bootstrap(suite.db, clusterStateRoot) suite.Assert().Nil(err) - suite.state, err = NewMutableState(clusterState, tracer, all.Headers, colPayloads) + suite.state, err = NewMutableState(clusterState, tracer, all.Headers, colPayloads, procedure.NewClusterBlockIndexer()) suite.Assert().Nil(err) } diff --git a/state/cluster/pebble/snapshot_test.go b/state/cluster/pebble/snapshot_test.go index ea599a26d80..3e06dea93c1 100644 --- a/state/cluster/pebble/snapshot_test.go +++ b/state/cluster/pebble/snapshot_test.go @@ -16,6 +16,7 @@ import ( "github.com/onflow/flow-go/state/cluster" "github.com/onflow/flow-go/state/protocol" ppebble "github.com/onflow/flow-go/state/protocol/pebble" + protcolstorage "github.com/onflow/flow-go/storage" storage "github.com/onflow/flow-go/storage/pebble" "github.com/onflow/flow-go/storage/pebble/operation" "github.com/onflow/flow-go/storage/pebble/procedure" @@ -32,7 +33,8 @@ type SnapshotSuite struct { chainID flow.ChainID epochCounter uint64 - protoState protocol.State + protoState protocol.State + blockIndexer protcolstorage.ClusterBlockIndexer state cluster.MutableState } @@ -46,6 +48,7 @@ func (suite *SnapshotSuite) SetupTest() { suite.dbdir = unittest.TempDir(suite.T()) suite.db = unittest.PebbleDB(suite.T(), suite.dbdir) + suite.blockIndexer = procedure.NewClusterBlockIndexer() metrics := metrics.NewNoopCollector() tracer := trace.NewNoopTracer() @@ -76,7 +79,7 @@ func (suite *SnapshotSuite) SetupTest() { suite.Require().NoError(err) clusterState, err := Bootstrap(suite.db, clusterStateRoot) suite.Require().NoError(err) - suite.state, err = NewMutableState(clusterState, tracer, all.Headers, colPayloads) + suite.state, err = NewMutableState(clusterState, tracer, all.Headers, colPayloads, suite.blockIndexer) suite.Require().NoError(err) } @@ -123,7 +126,7 @@ func (suite *SnapshotSuite) Block() model.Block { } func (suite *SnapshotSuite) InsertBlock(block model.Block) { - err := operation.WithReaderBatchWriter(suite.db, procedure.InsertClusterBlock(&block)) + err := operation.WithReaderBatchWriter(suite.db, suite.blockIndexer.InsertClusterBlock(&block)) suite.Assert().Nil(err) } diff --git a/state/cluster/pebble/state.go b/state/cluster/pebble/state.go index 71d94ee02a2..db2e776c97c 100644 --- a/state/cluster/pebble/state.go +++ b/state/cluster/pebble/state.go @@ -36,12 +36,13 @@ func Bootstrap(db *pebble.DB, stateRoot *StateRoot) (*State, error) { genesis := stateRoot.Block() rootQC := stateRoot.QC() + indexer := procedure.NewClusterBlockIndexer() // bootstrap cluster state err = operation.WithReaderBatchWriter(state.db, func(tx storage.PebbleReaderBatchWriter) error { _, w := tx.ReaderWriter() chainID := genesis.Header.ChainID // insert the block - err := procedure.InsertClusterBlock(genesis)(tx) + err := indexer.InsertClusterBlock(genesis)(tx) if err != nil { return fmt.Errorf("could not insert genesis block: %w", err) } diff --git a/state/protocol/pebble/mutator.go b/state/protocol/pebble/mutator.go index 654258d8552..5eb732c99a7 100644 --- a/state/protocol/pebble/mutator.go +++ b/state/protocol/pebble/mutator.go @@ -17,7 +17,6 @@ import ( "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/pebble/operation" - "github.com/onflow/flow-go/storage/pebble/procedure" ) // FollowerState implements a lighter version of a mutable protocol state. @@ -33,12 +32,13 @@ import ( type FollowerState struct { *State - index storage.Index - payloads storage.Payloads - tracer module.Tracer - logger zerolog.Logger - consumer protocol.Consumer - blockTimer protocol.BlockTimer + index storage.Index + payloads storage.Payloads + blockIndexer storage.BlockIndexer + tracer module.Tracer + logger zerolog.Logger + consumer protocol.Consumer + blockTimer protocol.BlockTimer } var _ protocol.FollowerState = (*FollowerState)(nil) @@ -62,16 +62,18 @@ func NewFollowerState( state *State, index storage.Index, payloads storage.Payloads, + blockIndexer storage.BlockIndexer, blockTimer protocol.BlockTimer, ) (*FollowerState, error) { followerState := &FollowerState{ - State: state, - index: index, - payloads: payloads, - tracer: tracer, - logger: logger, - consumer: consumer, - blockTimer: blockTimer, + State: state, + index: index, + payloads: payloads, + blockIndexer: blockIndexer, + tracer: tracer, + logger: logger, + consumer: consumer, + blockTimer: blockTimer, } return followerState, nil } @@ -87,6 +89,7 @@ func NewFullConsensusState( state *State, index storage.Index, payloads storage.Payloads, + blockIndexer storage.BlockIndexer, blockTimer protocol.BlockTimer, receiptValidator module.ReceiptValidator, sealValidator module.SealValidator, @@ -98,6 +101,7 @@ func NewFullConsensusState( state, index, payloads, + blockIndexer, blockTimer, ) if err != nil { @@ -560,7 +564,7 @@ func (m *FollowerState) insert(ctx context.Context, candidate *flow.Block, certi } // index the child block for recovery - err = procedure.IndexNewBlock(blockID, candidate.Header.ParentID)(tx) + err = m.blockIndexer.IndexNewBlock(blockID, candidate.Header.ParentID)(tx) if err != nil { return fmt.Errorf("could not index new block: %w", err) } diff --git a/state/protocol/pebble/mutator_test.go b/state/protocol/pebble/mutator_test.go index 84ef432a617..a4d84cdfd45 100644 --- a/state/protocol/pebble/mutator_test.go +++ b/state/protocol/pebble/mutator_test.go @@ -33,6 +33,7 @@ import ( stoerr "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/pebble" "github.com/onflow/flow-go/storage/pebble/operation" + "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/storage/testingutils" "github.com/onflow/flow-go/utils/unittest" ) @@ -119,6 +120,7 @@ func TestExtendValid(t *testing.T) { state, all.Index, all.Payloads, + procedure.NewBlockIndexer(), util.MockBlockTimer(), util.MockReceiptValidator(), util.MockSealValidator(all.Seals), @@ -836,6 +838,7 @@ func TestExtendEpochTransitionValid(t *testing.T) { protoState, all.Index, all.Payloads, + procedure.NewBlockIndexer(), util.MockBlockTimer(), receiptValidator, sealValidator, @@ -1975,6 +1978,7 @@ func TestExtendInvalidSealsInBlock(t *testing.T) { state, all.Index, all.Payloads, + procedure.NewBlockIndexer(), util.MockBlockTimer(), util.MockReceiptValidator(), sealValidator, @@ -2474,6 +2478,7 @@ func TestHeaderInvalidTimestamp(t *testing.T) { state, all.Index, all.Payloads, + procedure.NewBlockIndexer(), blockTimer, util.MockReceiptValidator(), util.MockSealValidator(all.Seals), diff --git a/state/protocol/util/testing_pebble.go b/state/protocol/util/testing_pebble.go index ad27f3f5476..399c0e20a3a 100644 --- a/state/protocol/util/testing_pebble.go +++ b/state/protocol/util/testing_pebble.go @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/state/protocol/events" pbadger "github.com/onflow/flow-go/state/protocol/pebble" "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/storage/testingutils" "github.com/onflow/flow-go/utils/unittest" ) @@ -66,7 +67,7 @@ func RunWithPebbleFullProtocolState(t testing.TB, rootSnapshot protocol.Snapshot receiptValidator := MockReceiptValidator() sealValidator := MockSealValidator(all.Seals) mockTimer := MockBlockTimer() - fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer, receiptValidator, sealValidator) + fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, procedure.NewBlockIndexer(), mockTimer, receiptValidator, sealValidator) require.NoError(t, err) f(db, fullState) }) @@ -96,7 +97,7 @@ func RunWithPebbleFullProtocolStateAndMetrics(t testing.TB, rootSnapshot protoco receiptValidator := MockReceiptValidator() sealValidator := MockSealValidator(all.Seals) mockTimer := MockBlockTimer() - fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer, receiptValidator, sealValidator) + fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, procedure.NewBlockIndexer(), mockTimer, receiptValidator, sealValidator) require.NoError(t, err) f(db, fullState) }) @@ -126,7 +127,7 @@ func RunWithPebbleFullProtocolStateAndValidator(t testing.TB, rootSnapshot proto require.NoError(t, err) sealValidator := MockSealValidator(all.Seals) mockTimer := MockBlockTimer() - fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer, validator, sealValidator) + fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, procedure.NewBlockIndexer(), mockTimer, validator, sealValidator) require.NoError(t, err) f(db, fullState) }) @@ -155,7 +156,7 @@ func RunWithPebbleFollowerProtocolState(t testing.TB, rootSnapshot protocol.Snap ) require.NoError(t, err) mockTimer := MockBlockTimer() - followerState, err := pbadger.NewFollowerState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer) + followerState, err := pbadger.NewFollowerState(log, tracer, consumer, state, all.Index, all.Payloads, procedure.NewBlockIndexer(), mockTimer) require.NoError(t, err) f(db, followerState) }) @@ -185,7 +186,7 @@ func RunWithPebbleFullProtocolStateAndConsumer(t testing.TB, rootSnapshot protoc receiptValidator := MockReceiptValidator() sealValidator := MockSealValidator(all.Seals) mockTimer := MockBlockTimer() - fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer, receiptValidator, sealValidator) + fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, procedure.NewBlockIndexer(), mockTimer, receiptValidator, sealValidator) require.NoError(t, err) f(db, fullState) }) @@ -214,7 +215,7 @@ func RunWithPebbleFullProtocolStateAndMetricsAndConsumer(t testing.TB, rootSnaps receiptValidator := MockReceiptValidator() sealValidator := MockSealValidator(all.Seals) mockTimer := MockBlockTimer() - fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer, receiptValidator, sealValidator) + fullState, err := pbadger.NewFullConsensusState(log, tracer, consumer, state, all.Index, all.Payloads, procedure.NewBlockIndexer(), mockTimer, receiptValidator, sealValidator) require.NoError(t, err) f(db, fullState) }) @@ -243,7 +244,7 @@ func RunWithPebbleFollowerProtocolStateAndHeaders(t testing.TB, rootSnapshot pro ) require.NoError(t, err) mockTimer := MockBlockTimer() - followerState, err := pbadger.NewFollowerState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer) + followerState, err := pbadger.NewFollowerState(log, tracer, consumer, state, all.Index, all.Payloads, procedure.NewBlockIndexer(), mockTimer) require.NoError(t, err) f(db, followerState, all.Headers, all.Index) }) diff --git a/storage/blocks.go b/storage/blocks.go index cdcb9a0f6b3..cde63d80f8f 100644 --- a/storage/blocks.go +++ b/storage/blocks.go @@ -42,3 +42,10 @@ type Blocks interface { // GetLastFullBlockHeight retrieves the FullBlockHeight GetLastFullBlockHeight() (height uint64, err error) } + +// BlockIndexer is an interface for indexing new blocks. +type BlockIndexer interface { + // IndexNewBlock will add parent-child index for the new block. + // When calling by multiple goroutines, it should be thread-safe. + IndexNewBlock(blockID flow.Identifier, parentID flow.Identifier) func(PebbleReaderBatchWriter) error +} diff --git a/storage/cluster_blocks.go b/storage/cluster_blocks.go index ca5a3466b87..52914b142b2 100644 --- a/storage/cluster_blocks.go +++ b/storage/cluster_blocks.go @@ -17,3 +17,9 @@ type ClusterBlocks interface { // finalized blocks. ByHeight(height uint64) (*cluster.Block, error) } + +type ClusterBlockIndexer interface { + // InsertClusterBlock inserts a cluster consensus block, updating all associated indexes. + // When calling by multiple goroutines, it should be thread-safe. + InsertClusterBlock(block *cluster.Block) func(PebbleReaderBatchWriter) error +} diff --git a/storage/pebble/cluster_blocks_test.go b/storage/pebble/cluster_blocks_test.go index 39914266269..fdb398c469c 100644 --- a/storage/pebble/cluster_blocks_test.go +++ b/storage/pebble/cluster_blocks_test.go @@ -24,9 +24,10 @@ func TestClusterBlocksByHeight(t *testing.T) { err = operation.InsertClusterFinalizedHeight(parent.Header.ChainID, parent.Header.Height)(db) require.NoError(t, err) + blockIndexer := procedure.NewClusterBlockIndexer() // store a chain of blocks for _, block := range blocks { - err := operation.WithReaderBatchWriter(db, procedure.InsertClusterBlock(&block)) + err := operation.WithReaderBatchWriter(db, blockIndexer.InsertClusterBlock(&block)) require.NoError(t, err) err = operation.WithReaderBatchWriter(db, procedure.FinalizeClusterBlock(block.Header.ID())) diff --git a/storage/pebble/procedure/children.go b/storage/pebble/procedure/children.go index a63db0bee0f..dd3a30e184f 100644 --- a/storage/pebble/procedure/children.go +++ b/storage/pebble/procedure/children.go @@ -3,6 +3,7 @@ package procedure import ( "errors" "fmt" + "sync" "github.com/cockroachdb/pebble" @@ -11,11 +12,30 @@ import ( "github.com/onflow/flow-go/storage/pebble/operation" ) +type BlockIndexer struct { + // indexing is a mutex to avoid dirty reads when calling RetrieveBlockChildren + indexing *sync.Mutex +} + +func NewBlockIndexer() *BlockIndexer { + return &BlockIndexer{ + indexing: new(sync.Mutex), + } +} + +var _ storage.BlockIndexer = (*BlockIndexer)(nil) + +func (bi *BlockIndexer) IndexNewBlock(blockID flow.Identifier, parentID flow.Identifier) func(storage.PebbleReaderBatchWriter) error { + return IndexNewBlock(bi.indexing, blockID, parentID) +} + // IndexNewBlock will add parent-child index for the new block. // - Each block has a parent, we use this parent-child relationship to build a reverse index // - for looking up children blocks for a given block. This is useful for forks recovery // where we want to find all the pending children blocks for the lastest finalized block. // +// # It's concurrent safe to call this function by multiple goroutines, as it will acquire a lock +// // When adding parent-child index for a new block, we will add two indexes: // 1. since it's a new block, the new block should have no child, so adding an empty // index for the new block. Note: It's impossible there is a block whose parent is the @@ -24,7 +44,7 @@ import ( // there are two special cases for (2): // - if the parent block is zero, then we don't need to add this index. // - if the parent block doesn't exist, then we will insert the child index instead of updating -func IndexNewBlock(blockID flow.Identifier, parentID flow.Identifier) func(storage.PebbleReaderBatchWriter) error { +func IndexNewBlock(indexing *sync.Mutex, blockID flow.Identifier, parentID flow.Identifier) func(storage.PebbleReaderBatchWriter) error { return func(rw storage.PebbleReaderBatchWriter) error { r, tx := rw.ReaderWriter() @@ -42,6 +62,10 @@ func IndexNewBlock(blockID flow.Identifier, parentID flow.Identifier) func(stora return nil } + // acquiring a lock to avoid dirty reads when calling RetrieveBlockChildren + indexing.Lock() + defer indexing.Unlock() + // if the parent block is not zero, depending on whether the parent block has // children or not, we will either update the index or insert the index: // when parent block doesn't exist, we will insert the block children. @@ -65,7 +89,6 @@ func IndexNewBlock(blockID flow.Identifier, parentID flow.Identifier) func(stora // adding the new block to be another child of the parent childrenIDs = append(childrenIDs, blockID) - // TODO: use transaction to avoid race condition return operation.InsertBlockChildren(parentID, childrenIDs)(tx) } diff --git a/storage/pebble/procedure/children_test.go b/storage/pebble/procedure/children_test.go index 639d7a90569..ea44d5e13c6 100644 --- a/storage/pebble/procedure/children_test.go +++ b/storage/pebble/procedure/children_test.go @@ -17,12 +17,13 @@ import ( // after indexing a block by its parent, it should be able to retrieve the child block by the parentID func TestIndexAndLookupChild(t *testing.T) { unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + indexer := procedure.NewBlockIndexer() parentID := unittest.IdentifierFixture() childID := unittest.IdentifierFixture() rw := operation.NewPebbleReaderBatchWriter(db) - err := procedure.IndexNewBlock(childID, parentID)(rw) + err := indexer.IndexNewBlock(childID, parentID)(rw) require.NoError(t, err) require.NoError(t, rw.Commit()) @@ -41,6 +42,7 @@ func TestIndexAndLookupChild(t *testing.T) { // was indexed. func TestIndexTwiceAndRetrieve(t *testing.T) { unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + indexer := procedure.NewBlockIndexer() parentID := unittest.IdentifierFixture() child1ID := unittest.IdentifierFixture() @@ -48,13 +50,13 @@ func TestIndexTwiceAndRetrieve(t *testing.T) { rw := operation.NewPebbleReaderBatchWriter(db) // index the first child - err := procedure.IndexNewBlock(child1ID, parentID)(rw) + err := indexer.IndexNewBlock(child1ID, parentID)(rw) require.NoError(t, err) require.NoError(t, rw.Commit()) // index the second child rw = operation.NewPebbleReaderBatchWriter(db) - err = procedure.IndexNewBlock(child2ID, parentID)(rw) + err = indexer.IndexNewBlock(child2ID, parentID)(rw) require.NoError(t, err) require.NoError(t, rw.Commit()) @@ -70,10 +72,12 @@ func TestIndexTwiceAndRetrieve(t *testing.T) { func TestIndexZeroParent(t *testing.T) { unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + indexer := procedure.NewBlockIndexer() + childID := unittest.IdentifierFixture() rw := operation.NewPebbleReaderBatchWriter(db) - err := procedure.IndexNewBlock(childID, flow.ZeroID)(rw) + err := indexer.IndexNewBlock(childID, flow.ZeroID)(rw) require.NoError(t, err) require.NoError(t, rw.Commit()) @@ -87,6 +91,7 @@ func TestIndexZeroParent(t *testing.T) { // lookup block children will only return direct childrens func TestDirectChildren(t *testing.T) { unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + indexer := procedure.NewBlockIndexer() b1 := unittest.IdentifierFixture() b2 := unittest.IdentifierFixture() @@ -94,17 +99,17 @@ func TestDirectChildren(t *testing.T) { b4 := unittest.IdentifierFixture() rw := operation.NewPebbleReaderBatchWriter(db) - err := procedure.IndexNewBlock(b2, b1)(rw) + err := indexer.IndexNewBlock(b2, b1)(rw) require.NoError(t, err) require.NoError(t, rw.Commit()) rw = operation.NewPebbleReaderBatchWriter(db) - err = procedure.IndexNewBlock(b3, b2)(rw) + err = indexer.IndexNewBlock(b3, b2)(rw) require.NoError(t, err) require.NoError(t, rw.Commit()) rw = operation.NewPebbleReaderBatchWriter(db) - err = procedure.IndexNewBlock(b4, b3)(rw) + err = indexer.IndexNewBlock(b4, b3)(rw) require.NoError(t, err) require.NoError(t, rw.Commit()) diff --git a/storage/pebble/procedure/cluster.go b/storage/pebble/procedure/cluster.go index a9fed44fd42..29fc9dfb681 100644 --- a/storage/pebble/procedure/cluster.go +++ b/storage/pebble/procedure/cluster.go @@ -2,6 +2,7 @@ package procedure import ( "fmt" + "sync" "github.com/cockroachdb/pebble" @@ -13,9 +14,21 @@ import ( // This file implements storage functions for blocks in cluster consensus. +type ClusterBlockIndexer struct { + indexing *sync.Mutex +} + +var _ storage.ClusterBlockIndexer = (*ClusterBlockIndexer)(nil) + +func NewClusterBlockIndexer() *ClusterBlockIndexer { + return &ClusterBlockIndexer{ + indexing: new(sync.Mutex), + } +} + // InsertClusterBlock inserts a cluster consensus block, updating all // associated indexes. -func InsertClusterBlock(block *cluster.Block) func(storage.PebbleReaderBatchWriter) error { +func (i *ClusterBlockIndexer) InsertClusterBlock(block *cluster.Block) func(storage.PebbleReaderBatchWriter) error { return func(tx storage.PebbleReaderBatchWriter) error { _, w := tx.ReaderWriter() @@ -38,7 +51,7 @@ func InsertClusterBlock(block *cluster.Block) func(storage.PebbleReaderBatchWrit } // index the child block for recovery - err = IndexNewBlock(blockID, block.Header.ParentID)(tx) + err = IndexNewBlock(i.indexing, blockID, block.Header.ParentID)(tx) if err != nil { return fmt.Errorf("could not index new block: %w", err) } diff --git a/storage/pebble/procedure/cluster_test.go b/storage/pebble/procedure/cluster_test.go new file mode 100644 index 00000000000..1eb361a59eb --- /dev/null +++ b/storage/pebble/procedure/cluster_test.go @@ -0,0 +1,67 @@ +package procedure + +import ( + "testing" + + "github.com/cockroachdb/pebble" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/cluster" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage/pebble/operation" + "github.com/onflow/flow-go/utils/unittest" +) + +func TestInsertRetrieveClusterBlock(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + blockIndexer := NewClusterBlockIndexer() + block := unittest.ClusterBlockFixture() + + err := operation.WithReaderBatchWriter(db, blockIndexer.InsertClusterBlock(&block)) + require.NoError(t, err) + + var retrieved cluster.Block + err = RetrieveClusterBlock(block.Header.ID(), &retrieved)(db) + require.NoError(t, err) + + require.Equal(t, block, retrieved) + }) +} + +func TestFinalizeClusterBlock(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + blockIndexer := NewClusterBlockIndexer() + parent := unittest.ClusterBlockFixture() + + block := unittest.ClusterBlockWithParent(&parent) + + err := operation.WithReaderBatchWriter(db, blockIndexer.InsertClusterBlock(&block)) + require.NoError(t, err) + + // prepare previous finalized height + rw := operation.NewPebbleReaderBatchWriter(db) + _, w := rw.ReaderWriter() + + err = operation.IndexClusterBlockHeight(block.Header.ChainID, parent.Header.Height, parent.ID())(w) + require.NoError(t, err) + + err = operation.InsertClusterFinalizedHeight(block.Header.ChainID, parent.Header.Height)(w) + require.NoError(t, err) + + require.NoError(t, rw.Commit()) + + // finalize the block + err = operation.WithReaderBatchWriter(db, FinalizeClusterBlock(block.Header.ID())) + require.NoError(t, err) + + var boundary uint64 + err = operation.RetrieveClusterFinalizedHeight(block.Header.ChainID, &boundary)(db) + require.NoError(t, err) + require.Equal(t, block.Header.Height, boundary) + + var headID flow.Identifier + err = operation.LookupClusterBlockHeight(block.Header.ChainID, boundary, &headID)(db) + require.NoError(t, err) + require.Equal(t, block.ID(), headID) + }) +} From 148c8776e5a32fc2c8bd2930cc91065720047cdd Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 15 Aug 2024 17:38:35 -0700 Subject: [PATCH 2/3] refactor new follower state --- cmd/collection/main.go | 2 -- cmd/execution_builder.go | 1 - cmd/observer/node_builder/observer_builder.go | 2 -- cmd/verification_builder.go | 2 -- engine/testutil/nodes.go | 1 - .../builder/collection/builder_pebble_test.go | 1 - state/cluster/pebble/mutator_test.go | 2 +- state/protocol/pebble/mutator.go | 25 ++++++++----------- state/protocol/util/testing_pebble.go | 4 +-- 9 files changed, 14 insertions(+), 26 deletions(-) diff --git a/cmd/collection/main.go b/cmd/collection/main.go index 83cfe10e936..d0b43d4626e 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -52,7 +52,6 @@ import ( "github.com/onflow/flow-go/state/protocol/events/gadgets" pebbleState "github.com/onflow/flow-go/state/protocol/pebble" "github.com/onflow/flow-go/storage/pebble" - "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/utils/grpcutils" ) @@ -209,7 +208,6 @@ func main() { state, node.Storage.Index, node.Storage.Payloads, - procedure.NewBlockIndexer(), blocktimer.DefaultBlockTimer, ) return err diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 6e7b00e3dba..dfab5d5c68d 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -255,7 +255,6 @@ func (exeNode *ExecutionNode) LoadMutableFollowerState(node *NodeConfig) error { bState, node.Storage.Index, node.Storage.Payloads, - procedure.NewBlockIndexer(), blocktimer.DefaultBlockTimer, ) return err diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index f2b44d03c3e..0227a27a4aa 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -102,7 +102,6 @@ import ( "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/badger" pstorage "github.com/onflow/flow-go/storage/pebble" - "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/utils/grpcutils" "github.com/onflow/flow-go/utils/io" ) @@ -353,7 +352,6 @@ func (builder *ObserverServiceBuilder) buildFollowerState() *ObserverServiceBuil state, node.Storage.Index, node.Storage.Payloads, - procedure.NewBlockIndexer(), blocktimer.DefaultBlockTimer, ) builder.FollowerState = followerState diff --git a/cmd/verification_builder.go b/cmd/verification_builder.go index f56b93c7690..6f7bd4ee397 100644 --- a/cmd/verification_builder.go +++ b/cmd/verification_builder.go @@ -37,7 +37,6 @@ import ( "github.com/onflow/flow-go/state/protocol/blocktimer" pebbleState "github.com/onflow/flow-go/state/protocol/pebble" "github.com/onflow/flow-go/storage/pebble" - "github.com/onflow/flow-go/storage/pebble/procedure" ) type VerificationConfig struct { @@ -126,7 +125,6 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { state, node.Storage.Index, node.Storage.Payloads, - procedure.NewBlockIndexer(), blocktimer.DefaultBlockTimer, ) return err diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index b859e5f8fc8..76ecc49b2ee 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -588,7 +588,6 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit protoState.State, node.Index, node.Payloads, - procedure.NewBlockIndexer(), blocktimer.DefaultBlockTimer, ) require.NoError(t, err) diff --git a/module/builder/collection/builder_pebble_test.go b/module/builder/collection/builder_pebble_test.go index 56e2a39f066..097148d2ddd 100644 --- a/module/builder/collection/builder_pebble_test.go +++ b/module/builder/collection/builder_pebble_test.go @@ -121,7 +121,6 @@ func (suite *BuilderPebbleSuite) SetupTest() { state, all.Index, all.Payloads, - procedure.NewBlockIndexer(), util.MockBlockTimer(), ) require.NoError(suite.T(), err) diff --git a/state/cluster/pebble/mutator_test.go b/state/cluster/pebble/mutator_test.go index bf93ce66447..ead9fe10acc 100644 --- a/state/cluster/pebble/mutator_test.go +++ b/state/cluster/pebble/mutator_test.go @@ -91,7 +91,7 @@ func (suite *MutatorSuite) SetupTest() { rootSnapshot, ) require.NoError(suite.T(), err) - suite.protoState, err = ppebble.NewFollowerState(log, tracer, events.NewNoop(), state, all.Index, all.Payloads, procedure.NewBlockIndexer(), protocolutil.MockBlockTimer()) + suite.protoState, err = ppebble.NewFollowerState(log, tracer, events.NewNoop(), state, all.Index, all.Payloads, protocolutil.MockBlockTimer()) require.NoError(suite.T(), err) clusterStateRoot, err := NewStateRoot(suite.genesis, unittest.QuorumCertificateFixture(), suite.epochCounter) diff --git a/state/protocol/pebble/mutator.go b/state/protocol/pebble/mutator.go index 5eb732c99a7..634713bb29e 100644 --- a/state/protocol/pebble/mutator.go +++ b/state/protocol/pebble/mutator.go @@ -17,6 +17,7 @@ import ( "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/pebble/operation" + "github.com/onflow/flow-go/storage/pebble/procedure" ) // FollowerState implements a lighter version of a mutable protocol state. @@ -62,14 +63,13 @@ func NewFollowerState( state *State, index storage.Index, payloads storage.Payloads, - blockIndexer storage.BlockIndexer, blockTimer protocol.BlockTimer, ) (*FollowerState, error) { followerState := &FollowerState{ State: state, index: index, payloads: payloads, - blockIndexer: blockIndexer, + blockIndexer: procedure.NewBlockIndexer(), tracer: tracer, logger: logger, consumer: consumer, @@ -94,18 +94,15 @@ func NewFullConsensusState( receiptValidator module.ReceiptValidator, sealValidator module.SealValidator, ) (*ParticipantState, error) { - followerState, err := NewFollowerState( - logger, - tracer, - consumer, - state, - index, - payloads, - blockIndexer, - blockTimer, - ) - if err != nil { - return nil, fmt.Errorf("initialization of Mutable Follower State failed: %w", err) + followerState := &FollowerState{ + logger: logger, + tracer: tracer, + consumer: consumer, + State: state, + index: index, + payloads: payloads, + blockIndexer: blockIndexer, + blockTimer: blockTimer, } return &ParticipantState{ FollowerState: followerState, diff --git a/state/protocol/util/testing_pebble.go b/state/protocol/util/testing_pebble.go index 399c0e20a3a..e6da0dc6cdb 100644 --- a/state/protocol/util/testing_pebble.go +++ b/state/protocol/util/testing_pebble.go @@ -156,7 +156,7 @@ func RunWithPebbleFollowerProtocolState(t testing.TB, rootSnapshot protocol.Snap ) require.NoError(t, err) mockTimer := MockBlockTimer() - followerState, err := pbadger.NewFollowerState(log, tracer, consumer, state, all.Index, all.Payloads, procedure.NewBlockIndexer(), mockTimer) + followerState, err := pbadger.NewFollowerState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer) require.NoError(t, err) f(db, followerState) }) @@ -244,7 +244,7 @@ func RunWithPebbleFollowerProtocolStateAndHeaders(t testing.TB, rootSnapshot pro ) require.NoError(t, err) mockTimer := MockBlockTimer() - followerState, err := pbadger.NewFollowerState(log, tracer, consumer, state, all.Index, all.Payloads, procedure.NewBlockIndexer(), mockTimer) + followerState, err := pbadger.NewFollowerState(log, tracer, consumer, state, all.Index, all.Payloads, mockTimer) require.NoError(t, err) f(db, followerState, all.Headers, all.Index) }) From 77b64c0f38644eda6abb593025e410ec8c2f8a6e Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 16 Aug 2024 12:01:25 -0700 Subject: [PATCH 3/3] not releasing the lock until the pebblw batch update is committed --- storage/batch.go | 2 +- storage/pebble/cluster_payloads.go | 6 ++-- storage/pebble/operation/common.go | 18 +++++----- storage/pebble/procedure/children.go | 6 +++- storage/pebble/procedure/children_test.go | 42 +++++++++++++++++++++++ storage/pebble/value_cache.go | 6 ++-- 6 files changed, 66 insertions(+), 14 deletions(-) diff --git a/storage/batch.go b/storage/batch.go index e157cc3e31c..8c27d751958 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -32,7 +32,7 @@ type BatchStorage interface { type PebbleReaderBatchWriter interface { ReaderWriter() (pebble.Reader, pebble.Writer) IndexedBatch() *pebble.Batch - AddCallback(func()) + AddCallback(func(error)) } func OnlyWriter(fn func(pebble.Writer) error) func(PebbleReaderBatchWriter) error { diff --git a/storage/pebble/cluster_payloads.go b/storage/pebble/cluster_payloads.go index 76a57364eb6..009259055c8 100644 --- a/storage/pebble/cluster_payloads.go +++ b/storage/pebble/cluster_payloads.go @@ -43,8 +43,10 @@ func (cp *ClusterPayloads) storeTx(blockID flow.Identifier, payload *cluster.Pay return func(tx storage.PebbleReaderBatchWriter) error { _, w := tx.ReaderWriter() - tx.AddCallback(func() { - cp.cache.Insert(blockID, payload) + tx.AddCallback(func(err error) { + if err != nil { + cp.cache.Insert(blockID, payload) + } }) return procedure.InsertClusterPayload(blockID, payload)(w) diff --git a/storage/pebble/operation/common.go b/storage/pebble/operation/common.go index 48e0861c418..5cac30a922d 100644 --- a/storage/pebble/operation/common.go +++ b/storage/pebble/operation/common.go @@ -15,7 +15,7 @@ import ( type ReaderBatchWriter struct { db *pebble.DB batch *pebble.Batch - callbacks []func() + callbacks []func(error) } var _ storage.PebbleReaderBatchWriter = (*ReaderBatchWriter)(nil) @@ -29,10 +29,16 @@ func (b *ReaderBatchWriter) IndexedBatch() *pebble.Batch { } func (b *ReaderBatchWriter) Commit() error { - return b.batch.Commit(nil) + err := b.batch.Commit(nil) + + for _, callback := range b.callbacks { + callback(err) + } + + return err } -func (b *ReaderBatchWriter) AddCallback(callback func()) { +func (b *ReaderBatchWriter) AddCallback(callback func(error)) { b.callbacks = append(b.callbacks, callback) } @@ -40,7 +46,7 @@ func NewPebbleReaderBatchWriterWithBatch(db *pebble.DB, batch *pebble.Batch) *Re return &ReaderBatchWriter{ db: db, batch: batch, - callbacks: make([]func(), 0), + callbacks: make([]func(error), 0), } } @@ -62,10 +68,6 @@ func WithReaderBatchWriter(db *pebble.DB, fn func(storage.PebbleReaderBatchWrite return err } - for _, callback := range batch.callbacks { - callback() - } - return nil } diff --git a/storage/pebble/procedure/children.go b/storage/pebble/procedure/children.go index dd3a30e184f..cc92fe8b3ba 100644 --- a/storage/pebble/procedure/children.go +++ b/storage/pebble/procedure/children.go @@ -64,7 +64,11 @@ func IndexNewBlock(indexing *sync.Mutex, blockID flow.Identifier, parentID flow. // acquiring a lock to avoid dirty reads when calling RetrieveBlockChildren indexing.Lock() - defer indexing.Unlock() + rw.AddCallback(func(error) { + // the lock is not released until the batch update is committed. + // the lock will be released regardless the commit is successful or not. + indexing.Unlock() + }) // if the parent block is not zero, depending on whether the parent block has // children or not, we will either update the index or insert the index: diff --git a/storage/pebble/procedure/children_test.go b/storage/pebble/procedure/children_test.go index ea44d5e13c6..3bc3a2e0bc2 100644 --- a/storage/pebble/procedure/children_test.go +++ b/storage/pebble/procedure/children_test.go @@ -2,6 +2,7 @@ package procedure_test import ( "errors" + "sync" "testing" "github.com/cockroachdb/pebble" @@ -133,3 +134,44 @@ func TestDirectChildren(t *testing.T) { require.Nil(t, retrievedIDs) }) } + +func TestIndexConcurrent(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + indexer := procedure.NewBlockIndexer() + + parentID := unittest.IdentifierFixture() + child1ID := unittest.IdentifierFixture() + child2ID := unittest.IdentifierFixture() + + var wg sync.WaitGroup + wg.Add(2) + + // index the first child concurrently + go func() { + defer wg.Done() + rw := operation.NewPebbleReaderBatchWriter(db) + err := indexer.IndexNewBlock(child1ID, parentID)(rw) + require.NoError(t, err) + require.NoError(t, rw.Commit()) + }() + + // index the second child concurrently + go func() { + defer wg.Done() + rw := operation.NewPebbleReaderBatchWriter(db) + err := indexer.IndexNewBlock(child2ID, parentID)(rw) + require.NoError(t, err) + require.NoError(t, rw.Commit()) + }() + + // Wait for both indexing operations to complete + wg.Wait() + + // Verify that both children were correctly indexed + var retrievedIDs flow.IdentifierList + err := procedure.LookupBlockChildren(parentID, &retrievedIDs)(db) + require.NoError(t, err) + + require.ElementsMatch(t, flow.IdentifierList{child1ID, child2ID}, retrievedIDs) + }) +} diff --git a/storage/pebble/value_cache.go b/storage/pebble/value_cache.go index 05165c1663f..c19585f6165 100644 --- a/storage/pebble/value_cache.go +++ b/storage/pebble/value_cache.go @@ -140,8 +140,10 @@ func (c *Cache[K, V]) PutPebble(key K, resource V) func(storage.PebbleReaderBatc storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution) return func(rw storage.PebbleReaderBatchWriter) error { - rw.AddCallback(func() { - c.Insert(key, resource) + rw.AddCallback(func(err error) { + if err != nil { + c.Insert(key, resource) + } }) err := storeOps(rw)