diff --git a/module/finalizer/collection/finalizer_pebble.go b/module/finalizer/collection/finalizer_pebble.go index 487c78a3f62..91fc141efc5 100644 --- a/module/finalizer/collection/finalizer_pebble.go +++ b/module/finalizer/collection/finalizer_pebble.go @@ -2,6 +2,7 @@ package collection import ( "fmt" + "sync" "github.com/cockroachdb/pebble" @@ -25,6 +26,7 @@ type FinalizerPebble struct { transactions mempool.Transactions prov network.Engine metrics module.CollectionMetrics + finalizing *sync.Mutex } // NewFinalizerPebble creates a new finalizer for collection nodes. @@ -39,6 +41,7 @@ func NewFinalizerPebble( transactions: transactions, prov: prov, metrics: metrics, + finalizing: new(sync.Mutex), } return f } @@ -62,6 +65,10 @@ func (f *FinalizerPebble) MakeFinal(blockID flow.Identifier) error { return fmt.Errorf("could not retrieve header: %w", err) } + // we need to ensure that only one block is finalized at a time + f.finalizing.Lock() + defer f.finalizing.Unlock() + // retrieve the current finalized cluster state boundary var boundary uint64 err = operation.RetrieveClusterFinalizedHeight(header.ChainID, &boundary)(f.db) diff --git a/module/finalizer/collection/finalizer_pebble_test.go b/module/finalizer/collection/finalizer_pebble_test.go index d9472aeea58..1b9ef21eedb 100644 --- a/module/finalizer/collection/finalizer_pebble_test.go +++ b/module/finalizer/collection/finalizer_pebble_test.go @@ -1,6 +1,7 @@ package collection_test import ( + "sync" "testing" "github.com/cockroachdb/pebble" @@ -8,6 +9,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/model/cluster" model "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/messages" @@ -15,7 +17,7 @@ import ( "github.com/onflow/flow-go/module/mempool/herocache" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network/mocknetwork" - cluster "github.com/onflow/flow-go/state/cluster/pebble" + clusterpebble "github.com/onflow/flow-go/state/cluster/pebble" "github.com/onflow/flow-go/storage/pebble/operation" "github.com/onflow/flow-go/storage/pebble/procedure" "github.com/onflow/flow-go/utils/unittest" @@ -30,7 +32,7 @@ func TestFinalizerPebble(t *testing.T) { metrics := metrics.NewNoopCollector() - var state *cluster.State + var state *clusterpebble.State pool := herocache.NewTransactions(1000, unittest.Logger(), metrics) @@ -47,9 +49,9 @@ func TestFinalizerPebble(t *testing.T) { // a helper function to bootstrap with the genesis block bootstrap := func() { - stateRoot, err := cluster.NewStateRoot(genesis, unittest.QuorumCertificateFixture(), 0) + stateRoot, err := clusterpebble.NewStateRoot(genesis, unittest.QuorumCertificateFixture(), 0) require.NoError(t, err) - state, err = cluster.Bootstrap(db, stateRoot) + state, err = clusterpebble.Bootstrap(db, stateRoot) require.NoError(t, err) err = operation.InsertHeader(refBlock.ID(), refBlock)(db) require.NoError(t, err) @@ -360,7 +362,69 @@ func TestFinalizerPebble(t *testing.T) { }, }) }) + + // verify two guarantees: + // 1. concurrently finalizing blocks at different height would eventually arrive at + // highest finalized height + // 2. In each thread of updating finalized height, calling GetFinalziedHeight after MakeFinal + // is succeeded would not see a lower finalized height + t.Run("concurrency safety", func(t *testing.T) { + bootstrap() + defer cleanup() + + // prepare blocks + n := 100 + clusterBlocks := make([]*cluster.Block, 0, n) + + parent := genesis + + for i := 0; i < n; i++ { + block := unittest.ClusterBlockWithParent(parent) + block.Payload.ReferenceBlockID = parent.ID() + block.Header.PayloadHash = block.Payload.Hash() + clusterBlocks = append(clusterBlocks, &block) + parent = &block + } + + // insert blocks + for _, block := range clusterBlocks { + insert(*block) + } + + prov := new(mocknetwork.Engine) + prov.On("SubmitLocal", mock.Anything) + finalizer := collection.NewFinalizerPebble(db, pool, prov, metrics) + + // concurrently finalizing all blocks + var wg sync.WaitGroup + wg.Add(len(clusterBlocks)) + + for _, block := range clusterBlocks { + go func(block *cluster.Block) { + defer wg.Done() + + height := block.Header.Height + require.NoError(t, finalizer.MakeFinal(block.ID())) + + // query the finalized height again after MakeFinal is succeeded + // to ensure the finalized height is not lower than the current height + final, err := state.Final().Head() + require.NoError(t, err) + + require.GreaterOrEqual(t, final.Height, height) + }(block) + } + + wg.Wait() + + // Check that the final height is the highest among all blocks + final, err := state.Final().Head() + require.NoError(t, err) + require.Equal(t, clusterBlocks[len(clusterBlocks)-1].Header.Height, final.Height) + + }) }) + } // assertClusterBlocksIndexedByReferenceHeightPebble checks the given cluster blocks have diff --git a/module/finalizer/consensus/finalizer_pebble.go b/module/finalizer/consensus/finalizer_pebble.go index a5af5ae04e3..f2c0a28c0d1 100644 --- a/module/finalizer/consensus/finalizer_pebble.go +++ b/module/finalizer/consensus/finalizer_pebble.go @@ -3,6 +3,7 @@ package consensus import ( "context" "fmt" + "sync" "github.com/cockroachdb/pebble" @@ -22,6 +23,8 @@ type FinalizerPebble struct { state protocol.FollowerState cleanup CleanupFunc tracer module.Tracer + + finalizing *sync.Mutex } // NewFinalizerPebble creates a new finalizer for the temporary state. @@ -31,11 +34,12 @@ func NewFinalizerPebble(db *pebble.DB, tracer module.Tracer, options ...func(*FinalizerPebble)) *FinalizerPebble { f := &FinalizerPebble{ - db: db, - state: state, - headers: headers, - cleanup: CleanupNothing(), - tracer: tracer, + db: db, + state: state, + headers: headers, + cleanup: CleanupNothing(), + tracer: tracer, + finalizing: new(sync.Mutex), } for _, option := range options { option(f) @@ -56,6 +60,10 @@ func (f *FinalizerPebble) MakeFinal(blockID flow.Identifier) error { span, ctx := f.tracer.StartBlockSpan(context.Background(), blockID, trace.CONFinalizerFinalizeBlock) defer span.End() + // We want to ensure that only one block is being finalized at a time. + f.finalizing.Lock() + defer f.finalizing.Unlock() + // STEP ONE: This is an idempotent operation. In case we are trying to // finalize a block that is already below finalized height, we want to do // one of two things: if it conflicts with the block already finalized at diff --git a/module/finalizer/consensus/finalizer_pebble_test.go b/module/finalizer/consensus/finalizer_pebble_test.go index 183b150c21d..262727be4b1 100644 --- a/module/finalizer/consensus/finalizer_pebble_test.go +++ b/module/finalizer/consensus/finalizer_pebble_test.go @@ -1,7 +1,10 @@ package consensus import ( + "context" + "fmt" "math/rand" + "sync" "testing" "github.com/cockroachdb/pebble" @@ -13,6 +16,7 @@ import ( "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/module/trace" mockprot "github.com/onflow/flow-go/state/protocol/mock" + protocolstorage "github.com/onflow/flow-go/storage" mockstor "github.com/onflow/flow-go/storage/mock" storage "github.com/onflow/flow-go/storage/pebble" "github.com/onflow/flow-go/storage/pebble/operation" @@ -90,11 +94,12 @@ func TestMakeFinalValidChainPebble(t *testing.T) { // initialize the finalizer with the dependencies and make the call metrics := metrics.NewNoopCollector() fin := FinalizerPebble{ - db: db, - headers: storage.NewHeaders(metrics, db), - state: state, - tracer: trace.NewNoopTracer(), - cleanup: LogCleanup(&list), + db: db, + headers: storage.NewHeaders(metrics, db), + state: state, + tracer: trace.NewNoopTracer(), + cleanup: LogCleanup(&list), + finalizing: new(sync.Mutex), } err = fin.MakeFinal(lastID) require.NoError(t, err) @@ -146,11 +151,12 @@ func TestMakeFinalInvalidHeightPebble(t *testing.T) { // initialize the finalizer with the dependencies and make the call metrics := metrics.NewNoopCollector() fin := FinalizerPebble{ - db: db, - headers: storage.NewHeaders(metrics, db), - state: state, - tracer: trace.NewNoopTracer(), - cleanup: LogCleanup(&list), + db: db, + headers: storage.NewHeaders(metrics, db), + state: state, + tracer: trace.NewNoopTracer(), + cleanup: LogCleanup(&list), + finalizing: new(sync.Mutex), } err = fin.MakeFinal(pending.ID()) require.Error(t, err) @@ -194,11 +200,12 @@ func TestMakeFinalDuplicatePebble(t *testing.T) { // initialize the finalizer with the dependencies and make the call metrics := metrics.NewNoopCollector() fin := FinalizerPebble{ - db: db, - headers: storage.NewHeaders(metrics, db), - state: state, - tracer: trace.NewNoopTracer(), - cleanup: LogCleanup(&list), + db: db, + headers: storage.NewHeaders(metrics, db), + state: state, + tracer: trace.NewNoopTracer(), + cleanup: LogCleanup(&list), + finalizing: new(sync.Mutex), } err = fin.MakeFinal(final.ID()) require.NoError(t, err) @@ -210,3 +217,92 @@ func TestMakeFinalDuplicatePebble(t *testing.T) { // make sure no cleanup was done assert.Empty(t, list) } + +// create a chain of 10 blocks, calling MakeFinal(1), MakeFinal(2), ..., MakeFinal(10) concurrently +// expect 10 is finalized in the end +func TestMakeFinalConcurrencySafe(t *testing.T) { + genesis := unittest.BlockHeaderFixture() + blocks := unittest.ChainFixtureFrom(10, genesis) + + blockLookup := make(map[flow.Identifier]*flow.Block) + for _, block := range blocks { + blockLookup[block.Header.ID()] = block + } + + var list []flow.Identifier + + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + // create a mock protocol state to check finalize calls + state := mockprot.NewFollowerState(t) + state.On("Finalize", mock.Anything, mock.Anything).Return( + func(ctx context.Context, blockID flow.Identifier) error { + block, ok := blockLookup[blockID] + if !ok { + return fmt.Errorf("block %s not found", blockID) + } + + header := block.Header + + return operation.WithReaderBatchWriter(db, func(rw protocolstorage.PebbleReaderBatchWriter) error { + _, tx := rw.ReaderWriter() + err := operation.IndexBlockHeight(header.Height, header.ID())(tx) + if err != nil { + return err + } + return operation.UpdateFinalizedHeight(header.Height)(tx) + }) + }) + + // insert the latest finalized height + err := operation.InsertFinalizedHeight(genesis.Height)(db) + require.NoError(t, err) + + // map the finalized height to the finalized block ID + err = operation.IndexBlockHeight(genesis.Height, genesis.ID())(db) + require.NoError(t, err) + + // insert the finalized block header into the DB + err = operation.InsertHeader(genesis.ID(), genesis)(db) + require.NoError(t, err) + + // insert all of the pending blocks into the DB + for _, block := range blocks { + header := block.Header + err = operation.InsertHeader(header.ID(), header)(db) + require.NoError(t, err) + } + + // initialize the finalizer with the dependencies and make the call + metrics := metrics.NewNoopCollector() + fin := FinalizerPebble{ + db: db, + headers: storage.NewHeaders(metrics, db), + state: state, + tracer: trace.NewNoopTracer(), + cleanup: LogCleanup(&list), + finalizing: new(sync.Mutex), + } + + // Concurrently finalize blocks[0] to blocks[9] + var wg sync.WaitGroup + for _, block := range blocks { + wg.Add(1) + go func(block *flow.Block) { + defer wg.Done() + err := fin.MakeFinal(block.Header.ID()) + require.NoError(t, err) + }(block) + } + + // Wait for all finalization operations to complete + wg.Wait() + + var finalized uint64 + require.NoError(t, operation.RetrieveFinalizedHeight(&finalized)(db)) + + require.Equal(t, blocks[len(blocks)-1].Header.Height, finalized) + + // make sure that nothing was finalized + state.AssertExpectations(t) + }) +}