Skip to content

Commit

Permalink
add concurrency safety tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Aug 19, 2024
1 parent f9436d8 commit afd8526
Showing 1 changed file with 67 additions and 6 deletions.
73 changes: 67 additions & 6 deletions module/finalizer/collection/finalizer_pebble_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package collection_test

import (
"sync"
"testing"

"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/model/cluster"
model "github.com/onflow/flow-go/model/cluster"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module/finalizer/collection"
"github.com/onflow/flow-go/module/mempool/herocache"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network/mocknetwork"
cluster "github.com/onflow/flow-go/state/cluster/pebble"
clusterpebble "github.com/onflow/flow-go/state/cluster/pebble"
"github.com/onflow/flow-go/storage/pebble/operation"
"github.com/onflow/flow-go/storage/pebble/procedure"
"github.com/onflow/flow-go/utils/unittest"
Expand All @@ -30,7 +32,7 @@ func TestFinalizerPebble(t *testing.T) {

metrics := metrics.NewNoopCollector()

var state *cluster.State
var state *clusterpebble.State

pool := herocache.NewTransactions(1000, unittest.Logger(), metrics)

Expand All @@ -47,9 +49,9 @@ func TestFinalizerPebble(t *testing.T) {

// a helper function to bootstrap with the genesis block
bootstrap := func() {
stateRoot, err := cluster.NewStateRoot(genesis, unittest.QuorumCertificateFixture(), 0)
stateRoot, err := clusterpebble.NewStateRoot(genesis, unittest.QuorumCertificateFixture(), 0)
require.NoError(t, err)
state, err = cluster.Bootstrap(db, stateRoot)
state, err = clusterpebble.Bootstrap(db, stateRoot)
require.NoError(t, err)
err = operation.InsertHeader(refBlock.ID(), refBlock)(db)
require.NoError(t, err)
Expand Down Expand Up @@ -360,10 +362,69 @@ func TestFinalizerPebble(t *testing.T) {
},
})
})
})

t.Run("concurrency safety", func(t *testing.T) {
// verify two guarantees:
// 1. concurrently finalizing blocks at different height would eventually arrive at
// highest finalized height
// 2. In each thread of updating finalized height, calling GetFinalziedHeight after MakeFinal
// is succeeded would not see a lower finalized height
t.Run("concurrency safety", func(t *testing.T) {
bootstrap()
defer cleanup()

// prepare blocks
n := 100
clusterBlocks := make([]*cluster.Block, 0, n)

parent := genesis

for i := 0; i < n; i++ {
block := unittest.ClusterBlockWithParent(parent)
block.Payload.ReferenceBlockID = parent.ID()
block.Header.PayloadHash = block.Payload.Hash()
clusterBlocks = append(clusterBlocks, &block)
parent = &block
}

// insert blocks
for _, block := range clusterBlocks {
insert(*block)
}

prov := new(mocknetwork.Engine)
prov.On("SubmitLocal", mock.Anything)
finalizer := collection.NewFinalizerPebble(db, pool, prov, metrics)

// concurrently finalizing all blocks
var wg sync.WaitGroup
wg.Add(len(clusterBlocks))

for _, block := range clusterBlocks {
go func(block *cluster.Block) {
defer wg.Done()

height := block.Header.Height
require.NoError(t, finalizer.MakeFinal(block.ID()))

// query the finalized height again after MakeFinal is succeeded
// to ensure the finalized height is not lower than the current height
final, err := state.Final().Head()
require.NoError(t, err)

require.GreaterOrEqual(t, final.Height, height)
}(block)
}

wg.Wait()

// Check that the final height is the highest among all blocks
final, err := state.Final().Head()
require.NoError(t, err)
require.Equal(t, clusterBlocks[len(clusterBlocks)-1].Header.Height, final.Height)

})
})

}

// assertClusterBlocksIndexedByReferenceHeightPebble checks the given cluster blocks have
Expand Down

0 comments on commit afd8526

Please sign in to comment.