1
1
package consensus
2
2
3
3
import (
4
+ "context"
5
+ "fmt"
4
6
"math/rand"
7
+ "sync"
5
8
"testing"
6
9
7
10
"github.com/cockroachdb/pebble"
@@ -13,6 +16,7 @@ import (
13
16
"github.com/onflow/flow-go/module/metrics"
14
17
"github.com/onflow/flow-go/module/trace"
15
18
mockprot "github.com/onflow/flow-go/state/protocol/mock"
19
+ protocolstorage "github.com/onflow/flow-go/storage"
16
20
mockstor "github.com/onflow/flow-go/storage/mock"
17
21
storage "github.com/onflow/flow-go/storage/pebble"
18
22
"github.com/onflow/flow-go/storage/pebble/operation"
@@ -90,11 +94,12 @@ func TestMakeFinalValidChainPebble(t *testing.T) {
90
94
// initialize the finalizer with the dependencies and make the call
91
95
metrics := metrics .NewNoopCollector ()
92
96
fin := FinalizerPebble {
93
- db : db ,
94
- headers : storage .NewHeaders (metrics , db ),
95
- state : state ,
96
- tracer : trace .NewNoopTracer (),
97
- cleanup : LogCleanup (& list ),
97
+ db : db ,
98
+ headers : storage .NewHeaders (metrics , db ),
99
+ state : state ,
100
+ tracer : trace .NewNoopTracer (),
101
+ cleanup : LogCleanup (& list ),
102
+ finalizing : new (sync.Mutex ),
98
103
}
99
104
err = fin .MakeFinal (lastID )
100
105
require .NoError (t , err )
@@ -146,11 +151,12 @@ func TestMakeFinalInvalidHeightPebble(t *testing.T) {
146
151
// initialize the finalizer with the dependencies and make the call
147
152
metrics := metrics .NewNoopCollector ()
148
153
fin := FinalizerPebble {
149
- db : db ,
150
- headers : storage .NewHeaders (metrics , db ),
151
- state : state ,
152
- tracer : trace .NewNoopTracer (),
153
- cleanup : LogCleanup (& list ),
154
+ db : db ,
155
+ headers : storage .NewHeaders (metrics , db ),
156
+ state : state ,
157
+ tracer : trace .NewNoopTracer (),
158
+ cleanup : LogCleanup (& list ),
159
+ finalizing : new (sync.Mutex ),
154
160
}
155
161
err = fin .MakeFinal (pending .ID ())
156
162
require .Error (t , err )
@@ -194,11 +200,12 @@ func TestMakeFinalDuplicatePebble(t *testing.T) {
194
200
// initialize the finalizer with the dependencies and make the call
195
201
metrics := metrics .NewNoopCollector ()
196
202
fin := FinalizerPebble {
197
- db : db ,
198
- headers : storage .NewHeaders (metrics , db ),
199
- state : state ,
200
- tracer : trace .NewNoopTracer (),
201
- cleanup : LogCleanup (& list ),
203
+ db : db ,
204
+ headers : storage .NewHeaders (metrics , db ),
205
+ state : state ,
206
+ tracer : trace .NewNoopTracer (),
207
+ cleanup : LogCleanup (& list ),
208
+ finalizing : new (sync.Mutex ),
202
209
}
203
210
err = fin .MakeFinal (final .ID ())
204
211
require .NoError (t , err )
@@ -210,3 +217,92 @@ func TestMakeFinalDuplicatePebble(t *testing.T) {
210
217
// make sure no cleanup was done
211
218
assert .Empty (t , list )
212
219
}
220
+
221
+ // create a chain of 10 blocks, calling MakeFinal(1), MakeFinal(2), ..., MakeFinal(10) concurrently
222
+ // expect 10 is finalized in the end
223
+ func TestMakeFinalConcurrencySafe (t * testing.T ) {
224
+ genesis := unittest .BlockHeaderFixture ()
225
+ blocks := unittest .ChainFixtureFrom (10 , genesis )
226
+
227
+ blockLookup := make (map [flow.Identifier ]* flow.Block )
228
+ for _ , block := range blocks {
229
+ blockLookup [block .Header .ID ()] = block
230
+ }
231
+
232
+ var list []flow.Identifier
233
+
234
+ unittest .RunWithPebbleDB (t , func (db * pebble.DB ) {
235
+ // create a mock protocol state to check finalize calls
236
+ state := mockprot .NewFollowerState (t )
237
+ state .On ("Finalize" , mock .Anything , mock .Anything ).Return (
238
+ func (ctx context.Context , blockID flow.Identifier ) error {
239
+ block , ok := blockLookup [blockID ]
240
+ if ! ok {
241
+ return fmt .Errorf ("block %s not found" , blockID )
242
+ }
243
+
244
+ header := block .Header
245
+
246
+ return operation .WithReaderBatchWriter (db , func (rw protocolstorage.PebbleReaderBatchWriter ) error {
247
+ _ , tx := rw .ReaderWriter ()
248
+ err := operation .IndexBlockHeight (header .Height , header .ID ())(tx )
249
+ if err != nil {
250
+ return err
251
+ }
252
+ return operation .UpdateFinalizedHeight (header .Height )(tx )
253
+ })
254
+ })
255
+
256
+ // insert the latest finalized height
257
+ err := operation .InsertFinalizedHeight (genesis .Height )(db )
258
+ require .NoError (t , err )
259
+
260
+ // map the finalized height to the finalized block ID
261
+ err = operation .IndexBlockHeight (genesis .Height , genesis .ID ())(db )
262
+ require .NoError (t , err )
263
+
264
+ // insert the finalized block header into the DB
265
+ err = operation .InsertHeader (genesis .ID (), genesis )(db )
266
+ require .NoError (t , err )
267
+
268
+ // insert all of the pending blocks into the DB
269
+ for _ , block := range blocks {
270
+ header := block .Header
271
+ err = operation .InsertHeader (header .ID (), header )(db )
272
+ require .NoError (t , err )
273
+ }
274
+
275
+ // initialize the finalizer with the dependencies and make the call
276
+ metrics := metrics .NewNoopCollector ()
277
+ fin := FinalizerPebble {
278
+ db : db ,
279
+ headers : storage .NewHeaders (metrics , db ),
280
+ state : state ,
281
+ tracer : trace .NewNoopTracer (),
282
+ cleanup : LogCleanup (& list ),
283
+ finalizing : new (sync.Mutex ),
284
+ }
285
+
286
+ // Concurrently finalize blocks[0] to blocks[9]
287
+ var wg sync.WaitGroup
288
+ for _ , block := range blocks {
289
+ wg .Add (1 )
290
+ go func (block * flow.Block ) {
291
+ defer wg .Done ()
292
+ err := fin .MakeFinal (block .Header .ID ())
293
+ require .NoError (t , err )
294
+ }(block )
295
+ }
296
+
297
+ // Wait for all finalization operations to complete
298
+ wg .Wait ()
299
+
300
+ var finalized uint64
301
+ require .NoError (t , operation .RetrieveFinalizedHeight (& finalized )(db ))
302
+
303
+ require .Equal (t , blocks [len (blocks )- 1 ].Header .Height , finalized )
304
+
305
+ // make sure that nothing was finalized
306
+ state .AssertExpectations (t )
307
+ })
308
+ }
0 commit comments