Skip to content

Commit

Permalink
Merge pull request #6373 from onflow/leo/v0.33-pebble-storage-finalizing
Browse files Browse the repository at this point in the history
[Pebble Refactor] Making finalization concurrent safe
  • Loading branch information
zhangchiqing authored Sep 13, 2024
2 parents 8eb0f74 + afd8526 commit 29d78d8
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 24 deletions.
7 changes: 7 additions & 0 deletions module/finalizer/collection/finalizer_pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package collection

import (
"fmt"
"sync"

"github.com/cockroachdb/pebble"

Expand All @@ -25,6 +26,7 @@ type FinalizerPebble struct {
transactions mempool.Transactions
prov network.Engine
metrics module.CollectionMetrics
finalizing *sync.Mutex
}

// NewFinalizerPebble creates a new finalizer for collection nodes.
Expand All @@ -39,6 +41,7 @@ func NewFinalizerPebble(
transactions: transactions,
prov: prov,
metrics: metrics,
finalizing: new(sync.Mutex),
}
return f
}
Expand All @@ -62,6 +65,10 @@ func (f *FinalizerPebble) MakeFinal(blockID flow.Identifier) error {
return fmt.Errorf("could not retrieve header: %w", err)
}

// we need to ensure that only one block is finalized at a time
f.finalizing.Lock()
defer f.finalizing.Unlock()

// retrieve the current finalized cluster state boundary
var boundary uint64
err = operation.RetrieveClusterFinalizedHeight(header.ChainID, &boundary)(f.db)
Expand Down
72 changes: 68 additions & 4 deletions module/finalizer/collection/finalizer_pebble_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package collection_test

import (
"sync"
"testing"

"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/model/cluster"
model "github.com/onflow/flow-go/model/cluster"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module/finalizer/collection"
"github.com/onflow/flow-go/module/mempool/herocache"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network/mocknetwork"
cluster "github.com/onflow/flow-go/state/cluster/pebble"
clusterpebble "github.com/onflow/flow-go/state/cluster/pebble"
"github.com/onflow/flow-go/storage/pebble/operation"
"github.com/onflow/flow-go/storage/pebble/procedure"
"github.com/onflow/flow-go/utils/unittest"
Expand All @@ -30,7 +32,7 @@ func TestFinalizerPebble(t *testing.T) {

metrics := metrics.NewNoopCollector()

var state *cluster.State
var state *clusterpebble.State

pool := herocache.NewTransactions(1000, unittest.Logger(), metrics)

Expand All @@ -47,9 +49,9 @@ func TestFinalizerPebble(t *testing.T) {

// a helper function to bootstrap with the genesis block
bootstrap := func() {
stateRoot, err := cluster.NewStateRoot(genesis, unittest.QuorumCertificateFixture(), 0)
stateRoot, err := clusterpebble.NewStateRoot(genesis, unittest.QuorumCertificateFixture(), 0)
require.NoError(t, err)
state, err = cluster.Bootstrap(db, stateRoot)
state, err = clusterpebble.Bootstrap(db, stateRoot)
require.NoError(t, err)
err = operation.InsertHeader(refBlock.ID(), refBlock)(db)
require.NoError(t, err)
Expand Down Expand Up @@ -360,7 +362,69 @@ func TestFinalizerPebble(t *testing.T) {
},
})
})

// verify two guarantees:
// 1. concurrently finalizing blocks at different height would eventually arrive at
// highest finalized height
// 2. In each thread of updating finalized height, calling GetFinalziedHeight after MakeFinal
// is succeeded would not see a lower finalized height
t.Run("concurrency safety", func(t *testing.T) {
bootstrap()
defer cleanup()

// prepare blocks
n := 100
clusterBlocks := make([]*cluster.Block, 0, n)

parent := genesis

for i := 0; i < n; i++ {
block := unittest.ClusterBlockWithParent(parent)
block.Payload.ReferenceBlockID = parent.ID()
block.Header.PayloadHash = block.Payload.Hash()
clusterBlocks = append(clusterBlocks, &block)
parent = &block
}

// insert blocks
for _, block := range clusterBlocks {
insert(*block)
}

prov := new(mocknetwork.Engine)
prov.On("SubmitLocal", mock.Anything)
finalizer := collection.NewFinalizerPebble(db, pool, prov, metrics)

// concurrently finalizing all blocks
var wg sync.WaitGroup
wg.Add(len(clusterBlocks))

for _, block := range clusterBlocks {
go func(block *cluster.Block) {
defer wg.Done()

height := block.Header.Height
require.NoError(t, finalizer.MakeFinal(block.ID()))

// query the finalized height again after MakeFinal is succeeded
// to ensure the finalized height is not lower than the current height
final, err := state.Final().Head()
require.NoError(t, err)

require.GreaterOrEqual(t, final.Height, height)
}(block)
}

wg.Wait()

// Check that the final height is the highest among all blocks
final, err := state.Final().Head()
require.NoError(t, err)
require.Equal(t, clusterBlocks[len(clusterBlocks)-1].Header.Height, final.Height)

})
})

}

// assertClusterBlocksIndexedByReferenceHeightPebble checks the given cluster blocks have
Expand Down
18 changes: 13 additions & 5 deletions module/finalizer/consensus/finalizer_pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"context"
"fmt"
"sync"

"github.com/cockroachdb/pebble"

Expand All @@ -22,6 +23,8 @@ type FinalizerPebble struct {
state protocol.FollowerState
cleanup CleanupFunc
tracer module.Tracer

finalizing *sync.Mutex
}

// NewFinalizerPebble creates a new finalizer for the temporary state.
Expand All @@ -31,11 +34,12 @@ func NewFinalizerPebble(db *pebble.DB,
tracer module.Tracer,
options ...func(*FinalizerPebble)) *FinalizerPebble {
f := &FinalizerPebble{
db: db,
state: state,
headers: headers,
cleanup: CleanupNothing(),
tracer: tracer,
db: db,
state: state,
headers: headers,
cleanup: CleanupNothing(),
tracer: tracer,
finalizing: new(sync.Mutex),
}
for _, option := range options {
option(f)
Expand All @@ -56,6 +60,10 @@ func (f *FinalizerPebble) MakeFinal(blockID flow.Identifier) error {
span, ctx := f.tracer.StartBlockSpan(context.Background(), blockID, trace.CONFinalizerFinalizeBlock)
defer span.End()

// We want to ensure that only one block is being finalized at a time.
f.finalizing.Lock()
defer f.finalizing.Unlock()

// STEP ONE: This is an idempotent operation. In case we are trying to
// finalize a block that is already below finalized height, we want to do
// one of two things: if it conflicts with the block already finalized at
Expand Down
126 changes: 111 additions & 15 deletions module/finalizer/consensus/finalizer_pebble_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package consensus

import (
"context"
"fmt"
"math/rand"
"sync"
"testing"

"github.com/cockroachdb/pebble"
Expand All @@ -13,6 +16,7 @@ import (
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/trace"
mockprot "github.com/onflow/flow-go/state/protocol/mock"
protocolstorage "github.com/onflow/flow-go/storage"
mockstor "github.com/onflow/flow-go/storage/mock"
storage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/pebble/operation"
Expand Down Expand Up @@ -90,11 +94,12 @@ func TestMakeFinalValidChainPebble(t *testing.T) {
// initialize the finalizer with the dependencies and make the call
metrics := metrics.NewNoopCollector()
fin := FinalizerPebble{
db: db,
headers: storage.NewHeaders(metrics, db),
state: state,
tracer: trace.NewNoopTracer(),
cleanup: LogCleanup(&list),
db: db,
headers: storage.NewHeaders(metrics, db),
state: state,
tracer: trace.NewNoopTracer(),
cleanup: LogCleanup(&list),
finalizing: new(sync.Mutex),
}
err = fin.MakeFinal(lastID)
require.NoError(t, err)
Expand Down Expand Up @@ -146,11 +151,12 @@ func TestMakeFinalInvalidHeightPebble(t *testing.T) {
// initialize the finalizer with the dependencies and make the call
metrics := metrics.NewNoopCollector()
fin := FinalizerPebble{
db: db,
headers: storage.NewHeaders(metrics, db),
state: state,
tracer: trace.NewNoopTracer(),
cleanup: LogCleanup(&list),
db: db,
headers: storage.NewHeaders(metrics, db),
state: state,
tracer: trace.NewNoopTracer(),
cleanup: LogCleanup(&list),
finalizing: new(sync.Mutex),
}
err = fin.MakeFinal(pending.ID())
require.Error(t, err)
Expand Down Expand Up @@ -194,11 +200,12 @@ func TestMakeFinalDuplicatePebble(t *testing.T) {
// initialize the finalizer with the dependencies and make the call
metrics := metrics.NewNoopCollector()
fin := FinalizerPebble{
db: db,
headers: storage.NewHeaders(metrics, db),
state: state,
tracer: trace.NewNoopTracer(),
cleanup: LogCleanup(&list),
db: db,
headers: storage.NewHeaders(metrics, db),
state: state,
tracer: trace.NewNoopTracer(),
cleanup: LogCleanup(&list),
finalizing: new(sync.Mutex),
}
err = fin.MakeFinal(final.ID())
require.NoError(t, err)
Expand All @@ -210,3 +217,92 @@ func TestMakeFinalDuplicatePebble(t *testing.T) {
// make sure no cleanup was done
assert.Empty(t, list)
}

// create a chain of 10 blocks, calling MakeFinal(1), MakeFinal(2), ..., MakeFinal(10) concurrently
// expect 10 is finalized in the end
func TestMakeFinalConcurrencySafe(t *testing.T) {
genesis := unittest.BlockHeaderFixture()
blocks := unittest.ChainFixtureFrom(10, genesis)

blockLookup := make(map[flow.Identifier]*flow.Block)
for _, block := range blocks {
blockLookup[block.Header.ID()] = block
}

var list []flow.Identifier

unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
// create a mock protocol state to check finalize calls
state := mockprot.NewFollowerState(t)
state.On("Finalize", mock.Anything, mock.Anything).Return(
func(ctx context.Context, blockID flow.Identifier) error {
block, ok := blockLookup[blockID]
if !ok {
return fmt.Errorf("block %s not found", blockID)
}

header := block.Header

return operation.WithReaderBatchWriter(db, func(rw protocolstorage.PebbleReaderBatchWriter) error {
_, tx := rw.ReaderWriter()
err := operation.IndexBlockHeight(header.Height, header.ID())(tx)
if err != nil {
return err
}
return operation.UpdateFinalizedHeight(header.Height)(tx)
})
})

// insert the latest finalized height
err := operation.InsertFinalizedHeight(genesis.Height)(db)
require.NoError(t, err)

// map the finalized height to the finalized block ID
err = operation.IndexBlockHeight(genesis.Height, genesis.ID())(db)
require.NoError(t, err)

// insert the finalized block header into the DB
err = operation.InsertHeader(genesis.ID(), genesis)(db)
require.NoError(t, err)

// insert all of the pending blocks into the DB
for _, block := range blocks {
header := block.Header
err = operation.InsertHeader(header.ID(), header)(db)
require.NoError(t, err)
}

// initialize the finalizer with the dependencies and make the call
metrics := metrics.NewNoopCollector()
fin := FinalizerPebble{
db: db,
headers: storage.NewHeaders(metrics, db),
state: state,
tracer: trace.NewNoopTracer(),
cleanup: LogCleanup(&list),
finalizing: new(sync.Mutex),
}

// Concurrently finalize blocks[0] to blocks[9]
var wg sync.WaitGroup
for _, block := range blocks {
wg.Add(1)
go func(block *flow.Block) {
defer wg.Done()
err := fin.MakeFinal(block.Header.ID())
require.NoError(t, err)
}(block)
}

// Wait for all finalization operations to complete
wg.Wait()

var finalized uint64
require.NoError(t, operation.RetrieveFinalizedHeight(&finalized)(db))

require.Equal(t, blocks[len(blocks)-1].Header.Height, finalized)

// make sure that nothing was finalized
state.AssertExpectations(t)
})
}

0 comments on commit 29d78d8

Please sign in to comment.