diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index b31ea749060..624dc4be67d 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -530,8 +530,8 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder { var ds datastore.Batching var bs network.BlobService - var processedBlockHeight storage.ConsumerProgress - var processedNotifications storage.ConsumerProgress + var processedBlockHeight storage.ConsumerProgressFactory + var processedNotifications storage.ConsumerProgressFactory var bsDependable *module.ProxiedReadyDoneAware var execDataDistributor *edrequester.ExecutionDataDistributor var execDataCacheBackend *herocache.BlockExecutionData @@ -586,9 +586,9 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { - processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + processedBlockHeight = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) } else { - processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + processedBlockHeight = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) } return nil }). @@ -596,9 +596,9 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { - processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification) + processedNotifications = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification) } else { - processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification) + processedNotifications = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification) } return nil }). @@ -826,7 +826,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } if builder.executionDataIndexingEnabled { - var indexedBlockHeight storage.ConsumerProgress + var indexedBlockHeight storage.ConsumerProgressFactory builder. AdminCommand("execute-script", func(config *cmd.NodeConfig) commands.AdminCommand { @@ -834,7 +834,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess }). Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the MAIN db since that is where indexed execution data is stored. - indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight) + indexedBlockHeight = bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight) return nil }). Module("transaction results storage", func(node *cmd.NodeConfig) error { @@ -1579,7 +1579,7 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() { } func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { - var processedBlockHeight storage.ConsumerProgress + var processedBlockHeight storage.ConsumerProgressFactory if builder.executionDataSyncEnabled { builder.BuildExecutionSyncComponents() @@ -1769,7 +1769,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil }). Module("processed block height consumer progress", func(node *cmd.NodeConfig) error { - processedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight) + processedBlockHeight = bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight) return nil }). Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error { @@ -1777,7 +1777,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { var err error lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressLastFullBlockHeight), + bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressLastFullBlockHeight), rootBlockHeight, ) if err != nil { diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 80c702f2967..c278923d08f 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -1107,8 +1107,8 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) { func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder { var ds datastore.Batching var bs network.BlobService - var processedBlockHeight storage.ConsumerProgress - var processedNotifications storage.ConsumerProgress + var processedBlockHeight storage.ConsumerProgressFactory + var processedNotifications storage.ConsumerProgressFactory var publicBsDependable *module.ProxiedReadyDoneAware var execDataDistributor *edrequester.ExecutionDataDistributor var execDataCacheBackend *herocache.BlockExecutionData @@ -1163,9 +1163,9 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { - processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + processedBlockHeight = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) } else { - processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + processedBlockHeight = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) } return nil }). @@ -1173,9 +1173,9 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { - processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification) + processedNotifications = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification) } else { - processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification) + processedNotifications = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification) } return nil }). @@ -1361,11 +1361,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return builder.ExecutionDataPruner, nil }) if builder.executionDataIndexingEnabled { - var indexedBlockHeight storage.ConsumerProgress + var indexedBlockHeight storage.ConsumerProgressFactory builder.Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the MAIN db since that is where indexed execution data is stored. - indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight) + indexedBlockHeight = bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight) return nil }).Module("transaction results storage", func(node *cmd.NodeConfig) error { builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize) diff --git a/cmd/verification_builder.go b/cmd/verification_builder.go index 70c84617f02..078554bae86 100644 --- a/cmd/verification_builder.go +++ b/cmd/verification_builder.go @@ -88,11 +88,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { var ( followerState protocol.FollowerState - chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine - chunkRequests *stdmap.ChunkRequests // used in requester engine - processedChunkIndex *badger.ConsumerProgress // used in chunk consumer - processedBlockHeight *badger.ConsumerProgress // used in block consumer - chunkQueue *badger.ChunksQueue // used in chunk consumer + chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine + chunkRequests *stdmap.ChunkRequests // used in requester engine + processedChunkIndex *badger.ConsumerProgressFactory // used in chunk consumer + processedBlockHeight *badger.ConsumerProgressFactory // used in block consumer + chunkQueue *badger.ChunksQueue // used in chunk consumer syncCore *chainsync.Core // used in follower engine assignerEngine *assigner.Engine // the assigner engine @@ -155,11 +155,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { return nil }). Module("processed chunk index consumer progress", func(node *NodeConfig) error { - processedChunkIndex = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationChunkIndex) + processedChunkIndex = badger.NewConsumerProgressFactory(node.DB, module.ConsumeProgressVerificationChunkIndex) return nil }). Module("processed block height consumer progress", func(node *NodeConfig) error { - processedBlockHeight = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationBlockHeight) + processedBlockHeight = badger.NewConsumerProgressFactory(node.DB, module.ConsumeProgressVerificationBlockHeight) return nil }). Module("chunks queue", func(node *NodeConfig) error { diff --git a/engine/access/access_test.go b/engine/access/access_test.go index 4aa8fc047ca..799e1eb05f1 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -678,13 +678,13 @@ func (suite *Suite) TestGetSealedTransaction() { require.NoError(suite.T(), err) lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight), + bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight), suite.rootBlock.Height, ) require.NoError(suite.T(), err) // create the ingest engine - processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressIngestionEngineBlockHeight) ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections, transactions, results, receipts, collectionExecutedMetric, processedHeight, lastFullBlockHeight) @@ -840,9 +840,9 @@ func (suite *Suite) TestGetTransactionResult() { ) require.NoError(suite.T(), err) - processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressIngestionEngineBlockHeight) lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight), + bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight), suite.rootBlock.Height, ) require.NoError(suite.T(), err) @@ -1070,9 +1070,9 @@ func (suite *Suite) TestExecuteScript() { suite.net.On("Register", channels.ReceiveReceipts, mock.Anything).Return(conduit, nil). Once() - processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressIngestionEngineBlockHeight) lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight), + bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight), suite.rootBlock.Height, ) require.NoError(suite.T(), err) diff --git a/engine/access/ingestion/engine.go b/engine/access/ingestion/engine.go index b36e5598c59..b8d729d85a7 100644 --- a/engine/access/ingestion/engine.go +++ b/engine/access/ingestion/engine.go @@ -119,7 +119,7 @@ func New( executionResults storage.ExecutionResults, executionReceipts storage.ExecutionReceipts, collectionExecutedMetric module.CollectionExecutedMetric, - processedHeight storage.ConsumerProgress, + processedHeight storage.ConsumerProgressFactory, lastFullBlockHeight *counters.PersistentStrictMonotonicCounter, ) (*Engine, error) { executionReceiptsRawQueue, err := fifoqueue.NewFifoQueue(defaultQueueCapacity) diff --git a/engine/access/ingestion/engine_test.go b/engine/access/ingestion/engine_test.go index 5e00e720334..ed55d2e5caa 100644 --- a/engine/access/ingestion/engine_test.go +++ b/engine/access/ingestion/engine_test.go @@ -180,11 +180,11 @@ func (s *Suite) SetupTest() { // initIngestionEngine create new instance of ingestion engine and waits when it starts func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine { - processedHeight := bstorage.NewConsumerProgress(s.db, module.ConsumeProgressIngestionEngineBlockHeight) + processedHeight := bstorage.NewConsumerProgressFactory(s.db, module.ConsumeProgressIngestionEngineBlockHeight) var err error s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight), + bstorage.NewConsumerProgressFactory(s.db, module.ConsumeProgressLastFullBlockHeight), s.finalizedBlock.Height, ) require.NoError(s.T(), err) diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index 78e7b1e676d..2cac20f94de 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -134,7 +134,7 @@ func (s *TransactionStatusSuite) SetupTest() { var err error s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight), + bstorage.NewConsumerProgressFactory(s.db, module.ConsumeProgressLastFullBlockHeight), s.rootBlock.Header.Height, ) require.NoError(s.T(), err) diff --git a/engine/access/rpc/backend/backend_test.go b/engine/access/rpc/backend/backend_test.go index 63bf2637e3e..4e1108e856a 100644 --- a/engine/access/rpc/backend/backend_test.go +++ b/engine/access/rpc/backend/backend_test.go @@ -124,7 +124,7 @@ func (suite *Suite) SetupTest() { suite.db, suite.dbDir = unittest.TempBadgerDB(suite.T()) suite.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(suite.db, module.ConsumeProgressLastFullBlockHeight), + bstorage.NewConsumerProgressFactory(suite.db, module.ConsumeProgressLastFullBlockHeight), 0, ) suite.Require().NoError(err) diff --git a/engine/testutil/mock/nodes.go b/engine/testutil/mock/nodes.go index d12a409cf57..e622d2c942d 100644 --- a/engine/testutil/mock/nodes.go +++ b/engine/testutil/mock/nodes.go @@ -281,12 +281,12 @@ type VerificationNode struct { Receipts storage.ExecutionReceipts // chunk consumer and processor for fetcher engine - ProcessedChunkIndex storage.ConsumerProgress + ProcessedChunkIndex storage.ConsumerProgressFactory ChunksQueue *bstorage.ChunksQueue ChunkConsumer *chunkconsumer.ChunkConsumer // block consumer for chunk consumer - ProcessedBlockHeight storage.ConsumerProgress + ProcessedBlockHeight storage.ConsumerProgressFactory BlockConsumer *blockconsumer.BlockConsumer VerifierEngine *verifier.Engine diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index b6d1037b500..9f14b1c8f50 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -994,7 +994,7 @@ func VerificationNode(t testing.TB, } if node.ProcessedChunkIndex == nil { - node.ProcessedChunkIndex = storage.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationChunkIndex) + node.ProcessedChunkIndex = storage.NewConsumerProgressFactory(node.PublicDB, module.ConsumeProgressVerificationChunkIndex) } if node.ChunksQueue == nil { @@ -1005,7 +1005,7 @@ func VerificationNode(t testing.TB, } if node.ProcessedBlockHeight == nil { - node.ProcessedBlockHeight = storage.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationBlockHeight) + node.ProcessedBlockHeight = storage.NewConsumerProgressFactory(node.PublicDB, module.ConsumeProgressVerificationBlockHeight) } if node.VerifierEngine == nil { diff --git a/engine/verification/assigner/blockconsumer/consumer.go b/engine/verification/assigner/blockconsumer/consumer.go index 7b6341be000..4e84cfaca63 100644 --- a/engine/verification/assigner/blockconsumer/consumer.go +++ b/engine/verification/assigner/blockconsumer/consumer.go @@ -42,7 +42,7 @@ func defaultProcessedIndex(state protocol.State) (uint64, error) { // index for initializing the processed index in storage. func NewBlockConsumer(log zerolog.Logger, metrics module.VerificationMetrics, - processedHeight storage.ConsumerProgress, + processedHeightFactory storage.ConsumerProgressFactory, blocks storage.Blocks, state protocol.State, blockProcessor assigner.FinalizedBlockProcessor, @@ -63,7 +63,12 @@ func NewBlockConsumer(log zerolog.Logger, return nil, 0, fmt.Errorf("could not read default processed index: %w", err) } - consumer, err := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing, 0, defaultIndex) + processedHeight, err := processedHeightFactory.InitConsumer(defaultIndex) + if err != nil { + return nil, 0, fmt.Errorf("could not initialize processed height: %w", err) + } + + consumer, err := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing, 0) if err != nil { return nil, 0, fmt.Errorf("could not create block consumer: %w", err) } diff --git a/engine/verification/assigner/blockconsumer/consumer_test.go b/engine/verification/assigner/blockconsumer/consumer_test.go index 26b784b03db..ca6114f762e 100644 --- a/engine/verification/assigner/blockconsumer/consumer_test.go +++ b/engine/verification/assigner/blockconsumer/consumer_test.go @@ -120,7 +120,7 @@ func withConsumer( unittest.RunWithBadgerDB(t, func(db *badger.DB) { maxProcessing := uint64(workerCount) - processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressVerificationBlockHeight) + processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressVerificationBlockHeight) collector := &metrics.NoopCollector{} tracer := trace.NewNoopTracer() log := unittest.Logger() diff --git a/engine/verification/fetcher/chunkconsumer/consumer.go b/engine/verification/fetcher/chunkconsumer/consumer.go index 703b51d5d7e..8cab172d85c 100644 --- a/engine/verification/fetcher/chunkconsumer/consumer.go +++ b/engine/verification/fetcher/chunkconsumer/consumer.go @@ -29,7 +29,7 @@ type ChunkConsumer struct { func NewChunkConsumer( log zerolog.Logger, metrics module.VerificationMetrics, - processedIndex storage.ConsumerProgress, // to persist the processed index + processedIndexFactory storage.ConsumerProgressFactory, // to persist the processed index chunksQueue storage.ChunksQueue, // to read jobs (chunks) from chunkProcessor fetcher.AssignedChunkProcessor, // to process jobs (chunks) maxProcessing uint64, // max number of jobs to be processed in parallel @@ -39,8 +39,13 @@ func NewChunkConsumer( jobs := &ChunkJobs{locators: chunksQueue} + processedIndex, err := processedIndexFactory.InitConsumer(DefaultJobIndex) + if err != nil { + return nil, fmt.Errorf("could not initialize processed index: %w", err) + } + lg := log.With().Str("module", "chunk_consumer").Logger() - consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing, 0, DefaultJobIndex) + consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing, 0) if err != nil { return nil, err } diff --git a/engine/verification/fetcher/chunkconsumer/consumer_test.go b/engine/verification/fetcher/chunkconsumer/consumer_test.go index 1aabce2bd14..2363506ed69 100644 --- a/engine/verification/fetcher/chunkconsumer/consumer_test.go +++ b/engine/verification/fetcher/chunkconsumer/consumer_test.go @@ -144,7 +144,7 @@ func WithConsumer( unittest.RunWithBadgerDB(t, func(db *badger.DB) { maxProcessing := uint64(3) - processedIndex := storage.NewConsumerProgress(db, module.ConsumeProgressVerificationChunkIndex) + processedIndexFactory := storage.NewConsumerProgressFactory(db, module.ConsumeProgressVerificationChunkIndex) chunksQueue := storage.NewChunkQueue(db) ok, err := chunksQueue.Init(chunkconsumer.DefaultJobIndex) require.NoError(t, err) @@ -158,7 +158,7 @@ func WithConsumer( consumer, err := chunkconsumer.NewChunkConsumer( unittest.Logger(), collector, - processedIndex, + processedIndexFactory, chunksQueue, engine, maxProcessing, diff --git a/module/counters/persistent_strict_monotonic_counter.go b/module/counters/persistent_strict_monotonic_counter.go index caaf7b45919..7af56c32778 100644 --- a/module/counters/persistent_strict_monotonic_counter.go +++ b/module/counters/persistent_strict_monotonic_counter.go @@ -26,7 +26,12 @@ type PersistentStrictMonotonicCounter struct { // otherwise the state may become inconsistent. // // No errors are expected during normal operation. -func NewPersistentStrictMonotonicCounter(consumerProgress storage.ConsumerProgress, defaultIndex uint64) (*PersistentStrictMonotonicCounter, error) { +func NewPersistentStrictMonotonicCounter(factory storage.ConsumerProgressFactory, defaultIndex uint64) (*PersistentStrictMonotonicCounter, error) { + consumerProgress, err := factory.InitConsumer(defaultIndex) + if err != nil { + return nil, fmt.Errorf("could not init consumer progress: %w", err) + } + m := &PersistentStrictMonotonicCounter{ consumerProgress: consumerProgress, } @@ -34,14 +39,7 @@ func NewPersistentStrictMonotonicCounter(consumerProgress storage.ConsumerProgre // sync with storage for the processed index to ensure the consistency value, err := m.consumerProgress.ProcessedIndex() if err != nil { - if !errors.Is(err, storage.ErrNotFound) { - return nil, fmt.Errorf("could not read consumer progress: %w", err) - } - err := m.consumerProgress.InitProcessedIndex(defaultIndex) - if err != nil { - return nil, fmt.Errorf("could not init consumer progress: %w", err) - } - value = defaultIndex + return nil, fmt.Errorf("could not read consumer progress: %w", err) } m.counter = NewMonotonousCounter(value) diff --git a/module/counters/persistent_strict_monotonic_counter_test.go b/module/counters/persistent_strict_monotonic_counter_test.go index 62a1adedf22..2665abf47ea 100644 --- a/module/counters/persistent_strict_monotonic_counter_test.go +++ b/module/counters/persistent_strict_monotonic_counter_test.go @@ -16,7 +16,7 @@ func TestMonotonicConsumer(t *testing.T) { unittest.RunWithBadgerDB(t, func(db *badger.DB) { var height1 = uint64(1234) persistentStrictMonotonicCounter, err := counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight), + bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight), height1, ) require.NoError(t, err) @@ -42,7 +42,7 @@ func TestMonotonicConsumer(t *testing.T) { // check that new persistent strict monotonic counter has the same value persistentStrictMonotonicCounter2, err := counters.NewPersistentStrictMonotonicCounter( - bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight), + bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight), height1, ) require.NoError(t, err) diff --git a/module/jobqueue/component_consumer.go b/module/jobqueue/component_consumer.go index 1b174e712ad..32b9cf7561f 100644 --- a/module/jobqueue/component_consumer.go +++ b/module/jobqueue/component_consumer.go @@ -27,7 +27,7 @@ type ComponentConsumer struct { func NewComponentConsumer( log zerolog.Logger, workSignal <-chan struct{}, - progress storage.ConsumerProgress, + progressFactory storage.ConsumerProgressFactory, jobs module.Jobs, defaultIndex uint64, processor JobProcessor, // method used to process jobs @@ -48,7 +48,12 @@ func NewComponentConsumer( maxProcessing, ) - consumer, err := NewConsumer(log, jobs, progress, worker, maxProcessing, maxSearchAhead, defaultIndex) + progress, err := progressFactory.InitConsumer(defaultIndex) + if err != nil { + return nil, err + } + + consumer, err := NewConsumer(log, jobs, progress, worker, maxProcessing, maxSearchAhead) if err != nil { return nil, err } diff --git a/module/jobqueue/component_consumer_test.go b/module/jobqueue/component_consumer_test.go index de9d13b5981..12109b5c85d 100644 --- a/module/jobqueue/component_consumer_test.go +++ b/module/jobqueue/component_consumer_test.go @@ -75,6 +75,14 @@ func generateTestData(jobCount uint64) map[uint64]TestJob { return jobData } +type mockConsumerFactory struct { + progress *storagemock.ConsumerProgress +} + +func (f *mockConsumerFactory) InitConsumer(defaultIndex uint64) (storage.ConsumerProgress, error) { + return f.progress, nil +} + func (suite *ComponentConsumerSuite) prepareTest( processor JobProcessor, preNotifier NotifyDone, @@ -89,10 +97,12 @@ func (suite *ComponentConsumerSuite) prepareTest( progress.On("ProcessedIndex").Return(suite.defaultIndex, nil) progress.On("SetProcessedIndex", mock.AnythingOfType("uint64")).Return(nil) + progressFactory := &mockConsumerFactory{progress} + consumer, err := NewComponentConsumer( zerolog.New(os.Stdout).With().Timestamp().Logger(), workSignal, - progress, + progressFactory, jobs, suite.defaultIndex, processor, diff --git a/module/jobqueue/consumer.go b/module/jobqueue/consumer.go index 17e309a929c..08ac526db38 100644 --- a/module/jobqueue/consumer.go +++ b/module/jobqueue/consumer.go @@ -54,10 +54,9 @@ func NewConsumer( worker Worker, maxProcessing uint64, maxSearchAhead uint64, - defaultIndex uint64, ) (*Consumer, error) { - processedIndex, err := readProcessedIndex(log, progress, defaultIndex) + processedIndex, err := progress.ProcessedIndex() if err != nil { return nil, fmt.Errorf("could not read processed index: %w", err) } @@ -84,33 +83,6 @@ func NewConsumer( }, nil } -func readProcessedIndex(log zerolog.Logger, progress storage.ConsumerProgress, defaultIndex uint64) (uint64, error) { - // on startup, sync with storage for the processed index - // to ensure the consistency - processedIndex, err := progress.ProcessedIndex() - if errors.Is(err, storage.ErrNotFound) { - err := progress.InitProcessedIndex(defaultIndex) - if errors.Is(err, storage.ErrAlreadyExists) { - return 0, fmt.Errorf("processed index has already been inited, no effect for the second time. default index: %v", - defaultIndex) - } - - if err != nil { - return 0, fmt.Errorf("could not init processed index: %w", err) - } - - log.Warn().Uint64("processed index", processedIndex). - Msg("processed index not found, initialized.") - return defaultIndex, nil - } - - if err != nil { - return 0, fmt.Errorf("could not read processed index: %w", err) - } - - return processedIndex, nil -} - // Start starts consuming the jobs from the job queue. func (c *Consumer) Start() error { c.mu.Lock() diff --git a/module/jobqueue/consumer_behavior_test.go b/module/jobqueue/consumer_behavior_test.go index 98fc7395377..a99db8f04dc 100644 --- a/module/jobqueue/consumer_behavior_test.go +++ b/module/jobqueue/consumer_behavior_test.go @@ -469,10 +469,11 @@ func testWorkOnNextAfterFastforward(t *testing.T) { // rebuild a consumer with the dependencies to simulate a restart // jobs need to be reused, since it stores all the jobs reWorker := newMockWorker() - reProgress := badger.NewConsumerProgress(db, ConsumerTag) - reConsumer := newTestConsumer(t, reProgress, j, reWorker, 0, DefaultIndex) + reProgress, err := badger.NewConsumerProgressFactory(db, ConsumerTag).InitConsumer(DefaultIndex) + require.NoError(t, err) + reConsumer := newTestConsumer(t, reProgress, j, reWorker, 0) - err := reConsumer.Start() + err = reConsumer.Start() require.NoError(t, err) time.Sleep(1 * time.Millisecond) @@ -560,8 +561,9 @@ func runWithSeatchAhead(t testing.TB, maxSearchAhead uint64, defaultIndex uint64 unittest.RunWithBadgerDB(t, func(db *badgerdb.DB) { jobs := jobqueue.NewMockJobs() worker := newMockWorker() - progress := badger.NewConsumerProgress(db, ConsumerTag) - consumer := newTestConsumer(t, progress, jobs, worker, maxSearchAhead, defaultIndex) + progress, err := badger.NewConsumerProgressFactory(db, ConsumerTag).InitConsumer(defaultIndex) + require.NoError(t, err) + consumer := newTestConsumer(t, progress, jobs, worker, maxSearchAhead) runTestWith(consumer, progress, worker, jobs, db) }) } @@ -572,10 +574,10 @@ func assertProcessed(t testing.TB, cp storage.ConsumerProgress, expectProcessed require.Equal(t, expectProcessed, processed) } -func newTestConsumer(t testing.TB, cp storage.ConsumerProgress, jobs module.Jobs, worker jobqueue.Worker, maxSearchAhead uint64, defaultIndex uint64) module.JobConsumer { +func newTestConsumer(t testing.TB, cp storage.ConsumerProgress, jobs module.Jobs, worker jobqueue.Worker, maxSearchAhead uint64) module.JobConsumer { log := unittest.Logger().With().Str("module", "consumer").Logger() maxProcessing := uint64(3) - c, err := jobqueue.NewConsumer(log, jobs, cp, worker, maxProcessing, maxSearchAhead, defaultIndex) + c, err := jobqueue.NewConsumer(log, jobs, cp, worker, maxProcessing, maxSearchAhead) require.NoError(t, err) return c } diff --git a/module/jobqueue/consumer_test.go b/module/jobqueue/consumer_test.go index 90db5332f79..cafeb18e126 100644 --- a/module/jobqueue/consumer_test.go +++ b/module/jobqueue/consumer_test.go @@ -161,10 +161,11 @@ func TestProcessedIndexDeletion(t *testing.T) { unittest.RunWithBadgerDB(t, func(db *badgerdb.DB) { log := unittest.Logger().With().Str("module", "consumer").Logger() jobs := NewMockJobs() - progress := badger.NewConsumerProgress(db, "consumer") + progress, err := badger.NewConsumerProgressFactory(db, "consumer").InitConsumer(0) + require.NoError(t, err) worker := newMockWorker() maxProcessing := uint64(3) - c, err := NewConsumer(log, jobs, progress, worker, maxProcessing, 0, 0) + c, err := NewConsumer(log, jobs, progress, worker, maxProcessing, 0) require.NoError(t, err) worker.WithConsumer(c) @@ -197,8 +198,7 @@ func TestCheckBeforeStartIsNoop(t *testing.T) { storedProcessedIndex := uint64(100) worker := newMockWorker() - progress := badger.NewConsumerProgress(db, "consumer") - err := progress.InitProcessedIndex(storedProcessedIndex) + progress, err := badger.NewConsumerProgressFactory(db, "consumer").InitConsumer(storedProcessedIndex) require.NoError(t, err) c, err := NewConsumer( @@ -208,7 +208,6 @@ func TestCheckBeforeStartIsNoop(t *testing.T) { worker, uint64(3), 0, - 10, ) require.NoError(t, err) worker.WithConsumer(c) diff --git a/module/state_synchronization/indexer/indexer.go b/module/state_synchronization/indexer/indexer.go index 2de7650a872..e4dffdaf4df 100644 --- a/module/state_synchronization/indexer/indexer.go +++ b/module/state_synchronization/indexer/indexer.go @@ -70,7 +70,7 @@ func NewIndexer( indexer *IndexerCore, executionCache *cache.ExecutionDataCache, executionDataLatestHeight func() (uint64, error), - processedHeight storage.ConsumerProgress, + processedHeightFactory storage.ConsumerProgressFactory, ) (*Indexer, error) { r := &Indexer{ log: log.With().Str("module", "execution_indexer").Logger(), @@ -89,7 +89,7 @@ func NewIndexer( jobConsumer, err := jobqueue.NewComponentConsumer( r.log, r.exeDataNotifier.Channel(), - processedHeight, + processedHeightFactory, r.exeDataReader, initHeight, r.processExecutionData, diff --git a/module/state_synchronization/indexer/indexer_test.go b/module/state_synchronization/indexer/indexer_test.go index e7ec3bc5055..ed12c9e9efe 100644 --- a/module/state_synchronization/indexer/indexer_test.go +++ b/module/state_synchronization/indexer/indexer_test.go @@ -17,6 +17,7 @@ import ( "github.com/onflow/flow-go/module/executiondatasync/execution_data/mock" "github.com/onflow/flow-go/module/irrecoverable" mempool "github.com/onflow/flow-go/module/mempool/mock" + "github.com/onflow/flow-go/storage" storagemock "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/utils/unittest" ) @@ -82,7 +83,7 @@ func newIndexerTest(t *testing.T, availableBlocks int, lastIndexedIndex int) *in indexerCoreTest.indexer, exeCache, test.latestHeight, - progress, + &mockProgressFactory{progress}, ) require.NoError(t, err) @@ -121,6 +122,14 @@ func (w *indexerTest) run(ctx irrecoverable.SignalerContext, reachHeight uint64, unittest.RequireCloseBefore(w.t, w.worker.Done(), testTimeout, "timeout waiting for the consumer to be done") } +type mockProgressFactory struct { + mockProgress *mockProgress +} + +func (f *mockProgressFactory) InitConsumer(defaultIndex uint64) (storage.ConsumerProgress, error) { + return f.mockProgress, nil +} + type mockProgress struct { index *atomic.Uint64 doneIndex *atomic.Uint64 @@ -128,6 +137,8 @@ type mockProgress struct { doneChan chan struct{} } +var _ storage.ConsumerProgress = (*mockProgress)(nil) + func newMockProgress() *mockProgress { return &mockProgress{ index: atomic.NewUint64(0), @@ -150,11 +161,6 @@ func (w *mockProgress) SetProcessedIndex(index uint64) error { return nil } -func (w *mockProgress) InitProcessedIndex(index uint64) error { - w.index.Store(index) - return nil -} - // WaitForIndex will trigger a signal to the consumer, so they know the test reached a certain point func (w *mockProgress) WaitForIndex(n uint64) <-chan struct{} { w.doneIndex.Store(n) diff --git a/module/state_synchronization/requester/execution_data_requester.go b/module/state_synchronization/requester/execution_data_requester.go index 4ed489371dd..317248100ba 100644 --- a/module/state_synchronization/requester/execution_data_requester.go +++ b/module/state_synchronization/requester/execution_data_requester.go @@ -145,8 +145,8 @@ func New( edrMetrics module.ExecutionDataRequesterMetrics, downloader execution_data.Downloader, execDataCache *cache.ExecutionDataCache, - processedHeight storage.ConsumerProgress, - processedNotifications storage.ConsumerProgress, + processedHeightFactory storage.ConsumerProgressFactory, + processedNotificationsFactory storage.ConsumerProgressFactory, state protocol.State, headers storage.Headers, cfg ExecutionDataConfig, @@ -182,7 +182,7 @@ func New( blockConsumer, err := jobqueue.NewComponentConsumer( e.log.With().Str("module", "block_consumer").Logger(), e.finalizationNotifier.Channel(), // to listen to finalization events to find newly sealed blocks - processedHeight, // read and persist the downloaded height + processedHeightFactory, // read and persist the downloaded height sealedBlockReader, // read sealed blocks by height e.config.InitialBlockHeight, // initial "last processed" height for empty db e.processBlockJob, // process the sealed block job to download its execution data @@ -221,7 +221,8 @@ func New( // a block's execution data is downloaded and stored, and checks the `executionDataCache` to // find if the next un-processed consecutive height is available. // To know what's the height of the next un-processed consecutive height, it reads the latest - // consecutive height in `processedNotifications`. And it's persisted in storage to be crash-resistant. + // consecutive height in `processedNotifications` to be created by `processedNotificationsFactory. + // And it's persisted in storage to be crash-resistant. // When a new consecutive height is available, it calls `processNotificationJob` to notify all the // `e.consumers`. // Note: the `e.consumers` will be guaranteed to receive at least one `OnExecutionDataFetched` event @@ -229,7 +230,7 @@ func New( e.notificationConsumer, err = jobqueue.NewComponentConsumer( e.log.With().Str("module", "notification_consumer").Logger(), executionDataNotifier.Channel(), // listen for notifications from the block consumer - processedNotifications, // read and persist the notified height + processedNotificationsFactory, // read and persist the notified height e.executionDataReader, // read execution data by height e.config.InitialBlockHeight, // initial "last processed" height for empty db e.processNotificationJob, // process the job to send notifications for an execution data diff --git a/module/state_synchronization/requester/execution_data_requester_test.go b/module/state_synchronization/requester/execution_data_requester_test.go index deff90cb240..07ce7368080 100644 --- a/module/state_synchronization/requester/execution_data_requester_test.go +++ b/module/state_synchronization/requester/execution_data_requester_test.go @@ -412,8 +412,8 @@ func (suite *ExecutionDataRequesterSuite) prepareRequesterTest(cfg *fetchTestRun cache := cache.NewExecutionDataCache(suite.downloader, headers, seals, results, heroCache) followerDistributor := pubsub.NewFollowerDistributor() - processedHeight := bstorage.NewConsumerProgress(suite.db, module.ConsumeProgressExecutionDataRequesterBlockHeight) - processedNotification := bstorage.NewConsumerProgress(suite.db, module.ConsumeProgressExecutionDataRequesterNotification) + processedHeight := bstorage.NewConsumerProgressFactory(suite.db, module.ConsumeProgressExecutionDataRequesterBlockHeight) + processedNotification := bstorage.NewConsumerProgressFactory(suite.db, module.ConsumeProgressExecutionDataRequesterNotification) edr, err := requester.New( logger, diff --git a/storage/badger/chunks_queue.go b/storage/badger/chunks_queue.go index 430abe0241b..b02658020c4 100644 --- a/storage/badger/chunks_queue.go +++ b/storage/badger/chunks_queue.go @@ -3,6 +3,7 @@ package badger import ( "errors" "fmt" + "sync" "github.com/dgraph-io/badger/v2" @@ -16,7 +17,8 @@ import ( // Job consumers can read the locators as job from the queue by index. // Chunk locators stored in this queue are unique. type ChunksQueue struct { - db *badger.DB + db *badger.DB + storing sync.Mutex } const JobQueueChunksQueue = "JobQueueChunksQueue" @@ -32,7 +34,7 @@ func NewChunkQueue(db *badger.DB) *ChunksQueue { func (q *ChunksQueue) Init(defaultIndex uint64) (bool, error) { _, err := q.LatestIndex() if errors.Is(err, storage.ErrNotFound) { - err = q.db.Update(operation.InitJobLatestIndex(JobQueueChunksQueue, defaultIndex)) + err = operation.WithBatchWriter(q.db, operation.InitJobLatestIndex(JobQueueChunksQueue, defaultIndex)) if err != nil { return false, fmt.Errorf("could not init chunk locator queue with default index %v: %w", defaultIndex, err) } @@ -48,30 +50,48 @@ func (q *ChunksQueue) Init(defaultIndex uint64) (bool, error) { // StoreChunkLocator stores a new chunk locator that assigned to me to the job queue. // A true will be returned, if the locator was new. // A false will be returned, if the locator was duplicate. +// It's concurrent safe to store multipe locators func (q *ChunksQueue) StoreChunkLocator(locator *chunks.Locator) (bool, error) { - err := operation.RetryOnConflict(q.db.Update, func(tx *badger.Txn) error { + q.storing.Lock() + defer q.storing.Unlock() + + var alreadyExist bool + err := operation.HasChunkLocator(locator.ID(), &alreadyExist)(operation.ToReader(q.db)) + if err != nil { + return false, fmt.Errorf("could not check if chunk locator exists: %w", err) + } + + // was trying to store a duplicate locator + if alreadyExist { + return false, nil + } + + err = operation.WithReaderBatchWriter(q.db, func(rw storage.BadgerReaderBatchWriter) error { + r := rw.GlobalReader() + w := rw.Writer() + // make sure the chunk locator is unique - err := operation.InsertChunkLocator(locator)(tx) + err := operation.InsertChunkLocator(locator)(w) if err != nil { return fmt.Errorf("failed to insert chunk locator: %w", err) } // read the latest index var latest uint64 - err = operation.RetrieveJobLatestIndex(JobQueueChunksQueue, &latest)(tx) + err = operation.RetrieveJobLatestIndex(JobQueueChunksQueue, &latest)(r) if err != nil { return fmt.Errorf("failed to retrieve job index for chunk locator queue: %w", err) } // insert to the next index next := latest + 1 - err = operation.InsertJobAtIndex(JobQueueChunksQueue, next, locator.ID())(tx) + err = operation.InsertJobAtIndex(JobQueueChunksQueue, next, locator.ID())(w) if err != nil { return fmt.Errorf("failed to set job index for chunk locator queue at index %v: %w", next, err) } // update the next index as the latest index - err = operation.SetJobLatestIndex(JobQueueChunksQueue, next)(tx) + err = operation.SetJobLatestIndex(JobQueueChunksQueue, next)(w) if err != nil { return fmt.Errorf("failed to update latest index %v: %w", next, err) } @@ -79,10 +99,6 @@ func (q *ChunksQueue) StoreChunkLocator(locator *chunks.Locator) (bool, error) { return nil }) - // was trying to store a duplicate locator - if errors.Is(err, storage.ErrAlreadyExists) { - return false, nil - } if err != nil { return false, fmt.Errorf("failed to store chunk locator: %w", err) } @@ -92,7 +108,7 @@ func (q *ChunksQueue) StoreChunkLocator(locator *chunks.Locator) (bool, error) { // LatestIndex returns the index of the latest chunk locator stored in the queue. func (q *ChunksQueue) LatestIndex() (uint64, error) { var latest uint64 - err := q.db.View(operation.RetrieveJobLatestIndex(JobQueueChunksQueue, &latest)) + err := operation.RetrieveJobLatestIndex(JobQueueChunksQueue, &latest)(operation.ToReader(q.db)) if err != nil { return 0, fmt.Errorf("could not retrieve latest index for chunks queue: %w", err) } @@ -102,13 +118,13 @@ func (q *ChunksQueue) LatestIndex() (uint64, error) { // AtIndex returns the chunk locator stored at the given index in the queue. func (q *ChunksQueue) AtIndex(index uint64) (*chunks.Locator, error) { var locatorID flow.Identifier - err := q.db.View(operation.RetrieveJobAtIndex(JobQueueChunksQueue, index, &locatorID)) + err := operation.RetrieveJobAtIndex(JobQueueChunksQueue, index, &locatorID)(operation.ToReader(q.db)) if err != nil { return nil, fmt.Errorf("could not retrieve chunk locator in queue: %w", err) } var locator chunks.Locator - err = q.db.View(operation.RetrieveChunkLocator(locatorID, &locator)) + err = operation.RetrieveChunkLocator(locatorID, &locator)(operation.ToReader(q.db)) if err != nil { return nil, fmt.Errorf("could not retrieve locator for chunk id %v: %w", locatorID, err) } diff --git a/storage/badger/consumer_progress.go b/storage/badger/consumer_progress.go index 52855dd60b1..d33cf35b63d 100644 --- a/storage/badger/consumer_progress.go +++ b/storage/badger/consumer_progress.go @@ -5,15 +5,60 @@ import ( "github.com/dgraph-io/badger/v2" + "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger/operation" ) +// ConsumerProgressFactory is a factory to create consumer progress +// It ensures the consumer progress is initialized before being used +type ConsumerProgressFactory struct { + db *badger.DB + consumer string +} + +var _ storage.ConsumerProgressFactory = (*ConsumerProgressFactory)(nil) + +func NewConsumerProgressFactory(db *badger.DB, consumer string) *ConsumerProgressFactory { + return &ConsumerProgressFactory{ + db: db, + consumer: consumer, + } +} + +// InitConsumer inserts the default processed index to the storage layer to initialize the +// consumer if it has not been initialized before, and returns the initalized consumer progress. +// It should not be called concurrently +func (cpf *ConsumerProgressFactory) InitConsumer(defaultIndex uint64) (storage.ConsumerProgress, error) { + consumer := newConsumerProgress(cpf.db, cpf.consumer) + + _, err := consumer.ProcessedIndex() + if err == nil { + // the consumer progress factory has been initialized + return consumer, nil + } + + if !storage.IsNotFound(err) { + return nil, fmt.Errorf("could not check if consumer progress is initted for consumer %v: %w", + consumer, err) + } + + // never initialized, initialize now + err = operation.WithBatchWriter(cpf.db, operation.SetProcessedIndex(cpf.consumer, defaultIndex)) + if err != nil { + return nil, fmt.Errorf("could not init consumer progress for consumer %v: %w", cpf.consumer, err) + } + + return consumer, nil +} + type ConsumerProgress struct { db *badger.DB consumer string // to distinguish the consume progress between different consumers } -func NewConsumerProgress(db *badger.DB, consumer string) *ConsumerProgress { +var _ storage.ConsumerProgress = (*ConsumerProgress)(nil) + +func newConsumerProgress(db *badger.DB, consumer string) *ConsumerProgress { return &ConsumerProgress{ db: db, consumer: consumer, @@ -22,26 +67,15 @@ func NewConsumerProgress(db *badger.DB, consumer string) *ConsumerProgress { func (cp *ConsumerProgress) ProcessedIndex() (uint64, error) { var processed uint64 - err := cp.db.View(operation.RetrieveProcessedIndex(cp.consumer, &processed)) + err := operation.RetrieveProcessedIndex(cp.consumer, &processed)(operation.ToReader(cp.db)) if err != nil { return 0, fmt.Errorf("failed to retrieve processed index: %w", err) } return processed, nil } -// InitProcessedIndex insert the default processed index to the storage layer, can only be done once. -// initialize for the second time will return storage.ErrAlreadyExists -func (cp *ConsumerProgress) InitProcessedIndex(defaultIndex uint64) error { - err := operation.RetryOnConflict(cp.db.Update, operation.InsertProcessedIndex(cp.consumer, defaultIndex)) - if err != nil { - return fmt.Errorf("could not update processed index: %w", err) - } - - return nil -} - func (cp *ConsumerProgress) SetProcessedIndex(processed uint64) error { - err := operation.RetryOnConflict(cp.db.Update, operation.SetProcessedIndex(cp.consumer, processed)) + err := operation.WithBatchWriter(cp.db, operation.SetProcessedIndex(cp.consumer, processed)) if err != nil { return fmt.Errorf("could not update processed index: %w", err) } diff --git a/storage/badger/operation/chunk_locators.go b/storage/badger/operation/chunk_locators.go index ef7f11fec50..eb1d2d9eee4 100644 --- a/storage/badger/operation/chunk_locators.go +++ b/storage/badger/operation/chunk_locators.go @@ -1,16 +1,19 @@ package operation import ( - "github.com/dgraph-io/badger/v2" - "github.com/onflow/flow-go/model/chunks" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" ) -func InsertChunkLocator(locator *chunks.Locator) func(*badger.Txn) error { - return insert(makePrefix(codeChunk, locator.ID()), locator) +func InsertChunkLocator(locator *chunks.Locator) func(storage.Writer) error { + return insertW(makePrefix(codeChunk, locator.ID()), locator) +} + +func RetrieveChunkLocator(locatorID flow.Identifier, locator *chunks.Locator) func(storage.Reader) error { + return retrieveR(makePrefix(codeChunk, locatorID), locator) } -func RetrieveChunkLocator(locatorID flow.Identifier, locator *chunks.Locator) func(*badger.Txn) error { - return retrieve(makePrefix(codeChunk, locatorID), locator) +func HasChunkLocator(locatorID flow.Identifier, exist *bool) func(storage.Reader) error { + return existsR(makePrefix(codeChunk, locatorID), exist) } diff --git a/storage/badger/operation/common.go b/storage/badger/operation/common.go index 09abd97a62f..95a607f0c25 100644 --- a/storage/badger/operation/common.go +++ b/storage/badger/operation/common.go @@ -315,6 +315,25 @@ func retrieveR(key []byte, entity interface{}) func(storage.Reader) error { // exists returns true if a key exists in the database. // No errors are expected during normal operation. +func existsR(key []byte, keyExists *bool) func(storage.Reader) error { + return func(r storage.Reader) error { + _, closer, err := r.Get(key) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + *keyExists = false + return nil + } + + // exception while checking for the key + return irrecoverable.NewExceptionf("could not load data: %w", err) + } + *keyExists = true + defer closer.Close() + return nil + } +} + +// deprecated use existsR instead func exists(key []byte, keyExists *bool) func(*badger.Txn) error { return func(tx *badger.Txn) error { _, err := tx.Get(key) diff --git a/storage/badger/operation/jobs.go b/storage/badger/operation/jobs.go index 0f9eb3166ad..5788ea658c3 100644 --- a/storage/badger/operation/jobs.go +++ b/storage/badger/operation/jobs.go @@ -1,43 +1,38 @@ package operation import ( - "github.com/dgraph-io/badger/v2" - "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" ) -func RetrieveJobLatestIndex(queue string, index *uint64) func(*badger.Txn) error { - return retrieve(makePrefix(codeJobQueuePointer, queue), index) +func RetrieveJobLatestIndex(queue string, index *uint64) func(storage.Reader) error { + return retrieveR(makePrefix(codeJobQueuePointer, queue), index) } -func InitJobLatestIndex(queue string, index uint64) func(*badger.Txn) error { - return insert(makePrefix(codeJobQueuePointer, queue), index) +func InitJobLatestIndex(queue string, index uint64) func(storage.Writer) error { + return insertW(makePrefix(codeJobQueuePointer, queue), index) } -func SetJobLatestIndex(queue string, index uint64) func(*badger.Txn) error { - return update(makePrefix(codeJobQueuePointer, queue), index) +func SetJobLatestIndex(queue string, index uint64) func(storage.Writer) error { + return insertW(makePrefix(codeJobQueuePointer, queue), index) } // RetrieveJobAtIndex returns the entity at the given index -func RetrieveJobAtIndex(queue string, index uint64, entity *flow.Identifier) func(*badger.Txn) error { - return retrieve(makePrefix(codeJobQueue, queue, index), entity) +func RetrieveJobAtIndex(queue string, index uint64, entity *flow.Identifier) func(storage.Reader) error { + return retrieveR(makePrefix(codeJobQueue, queue, index), entity) } // InsertJobAtIndex insert an entity ID at the given index -func InsertJobAtIndex(queue string, index uint64, entity flow.Identifier) func(*badger.Txn) error { - return insert(makePrefix(codeJobQueue, queue, index), entity) +func InsertJobAtIndex(queue string, index uint64, entity flow.Identifier) func(storage.Writer) error { + return insertW(makePrefix(codeJobQueue, queue, index), entity) } // RetrieveProcessedIndex returns the processed index for a job consumer -func RetrieveProcessedIndex(jobName string, processed *uint64) func(*badger.Txn) error { - return retrieve(makePrefix(codeJobConsumerProcessed, jobName), processed) -} - -func InsertProcessedIndex(jobName string, processed uint64) func(*badger.Txn) error { - return insert(makePrefix(codeJobConsumerProcessed, jobName), processed) +func RetrieveProcessedIndex(jobName string, processed *uint64) func(storage.Reader) error { + return retrieveR(makePrefix(codeJobConsumerProcessed, jobName), processed) } // SetProcessedIndex updates the processed index for a job consumer with given index -func SetProcessedIndex(jobName string, processed uint64) func(*badger.Txn) error { - return update(makePrefix(codeJobConsumerProcessed, jobName), processed) +func SetProcessedIndex(jobName string, processed uint64) func(storage.Writer) error { + return insertW(makePrefix(codeJobConsumerProcessed, jobName), processed) } diff --git a/storage/badger/operation/reader_batch_writer.go b/storage/badger/operation/reader_batch_writer.go index 3c8a26530fd..07436fdd7db 100644 --- a/storage/badger/operation/reader_batch_writer.go +++ b/storage/badger/operation/reader_batch_writer.go @@ -89,6 +89,23 @@ func WithReaderBatchWriter(db *badger.DB, fn func(storage.BadgerReaderBatchWrite return batch.Commit() } +func WithBatchWriter(db *badger.DB, fn func(storage.Writer) error) error { + batch := NewReaderBatchWriter(db) + + err := storage.OnlyBadgerWriter(fn)(batch) + if err != nil { + // fn might use lock to ensure concurrent safety while reading and writing data + // and the lock is usually released by a callback. + // in other words, fn might hold a lock to be released by a callback, + // we need to notify the callback for the locks to be released before + // returning the error. + batch.notifyCallbacks(err) + return err + } + + return batch.Commit() +} + func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { return &ReaderBatchWriter{ db: db, diff --git a/storage/consumer_progress.go b/storage/consumer_progress.go index bd99926ba32..af0374ec056 100644 --- a/storage/consumer_progress.go +++ b/storage/consumer_progress.go @@ -4,10 +4,18 @@ package storage type ConsumerProgress interface { // read the current processed index ProcessedIndex() (uint64, error) - // insert the default processed index to the storage layer, can only be done once. - // initialize for the second time will return storage.ErrAlreadyExists - InitProcessedIndex(defaultIndex uint64) error // update the processed index in the storage layer. - // it will fail if InitProcessedIndex was never called. + // CAUTION: Not safe for concurrent use by multiple goroutines. + // Use PersistentStrictMonotonicCounter instead to enforce strictly increasing values with concurrent writers. SetProcessedIndex(processed uint64) error } + +// ConsumerProgressFactory creates a new ConsumerProgress for the given consumer. +// This intermediary factory type exists to separate the concurrency-sensitive initialization from the rest of ConsumerProgress. +// The factory is intended to be used once in a non-concurrent environment, then discarded. +// The resulting ConsumerProgress instance is safe for concurrent use by multiple goroutines. +type ConsumerProgressFactory interface { + // InitConsumer initializes the consumer progress for the given consumer. + // CAUTION: Must be called only once by one goroutine, per factory instance. + InitConsumer(defaultIndex uint64) (ConsumerProgress, error) +} diff --git a/storage/errors.go b/storage/errors.go index b150dffb283..fd481f74109 100644 --- a/storage/errors.go +++ b/storage/errors.go @@ -29,3 +29,9 @@ var ( // ErrNotBootstrapped is returned when the database has not been bootstrapped. ErrNotBootstrapped = errors.New("pebble database not bootstrapped") ) + +// IsNotFound returns true if the error is a not found error. +// useful so we won't make mistake like errors.Is(storage.ErrNotFound, err) +func IsNotFound(err error) bool { + return errors.Is(err, ErrNotFound) +} diff --git a/storage/mock/badger_reader_batch_writer.go b/storage/mock/badger_reader_batch_writer.go new file mode 100644 index 00000000000..08d3871feee --- /dev/null +++ b/storage/mock/badger_reader_batch_writer.go @@ -0,0 +1,94 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + badger "github.com/dgraph-io/badger/v2" + mock "github.com/stretchr/testify/mock" + + storage "github.com/onflow/flow-go/storage" +) + +// BadgerReaderBatchWriter is an autogenerated mock type for the BadgerReaderBatchWriter type +type BadgerReaderBatchWriter struct { + mock.Mock +} + +// AddCallback provides a mock function with given fields: _a0 +func (_m *BadgerReaderBatchWriter) AddCallback(_a0 func(error)) { + _m.Called(_a0) +} + +// BadgerWriteBatch provides a mock function with given fields: +func (_m *BadgerReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for BadgerWriteBatch") + } + + var r0 *badger.WriteBatch + if rf, ok := ret.Get(0).(func() *badger.WriteBatch); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*badger.WriteBatch) + } + } + + return r0 +} + +// GlobalReader provides a mock function with given fields: +func (_m *BadgerReaderBatchWriter) GlobalReader() storage.Reader { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GlobalReader") + } + + var r0 storage.Reader + if rf, ok := ret.Get(0).(func() storage.Reader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage.Reader) + } + } + + return r0 +} + +// Writer provides a mock function with given fields: +func (_m *BadgerReaderBatchWriter) Writer() storage.Writer { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Writer") + } + + var r0 storage.Writer + if rf, ok := ret.Get(0).(func() storage.Writer); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage.Writer) + } + } + + return r0 +} + +// NewBadgerReaderBatchWriter creates a new instance of BadgerReaderBatchWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBadgerReaderBatchWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *BadgerReaderBatchWriter { + mock := &BadgerReaderBatchWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/mock/consumer_progress.go b/storage/mock/consumer_progress.go index 591fbd39af7..6a865c61ab2 100644 --- a/storage/mock/consumer_progress.go +++ b/storage/mock/consumer_progress.go @@ -9,24 +9,6 @@ type ConsumerProgress struct { mock.Mock } -// InitProcessedIndex provides a mock function with given fields: defaultIndex -func (_m *ConsumerProgress) InitProcessedIndex(defaultIndex uint64) error { - ret := _m.Called(defaultIndex) - - if len(ret) == 0 { - panic("no return value specified for InitProcessedIndex") - } - - var r0 error - if rf, ok := ret.Get(0).(func(uint64) error); ok { - r0 = rf(defaultIndex) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // ProcessedIndex provides a mock function with given fields: func (_m *ConsumerProgress) ProcessedIndex() (uint64, error) { ret := _m.Called() diff --git a/storage/mock/consumer_progress_factory.go b/storage/mock/consumer_progress_factory.go new file mode 100644 index 00000000000..74bbabd0059 --- /dev/null +++ b/storage/mock/consumer_progress_factory.go @@ -0,0 +1,57 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + storage "github.com/onflow/flow-go/storage" + mock "github.com/stretchr/testify/mock" +) + +// ConsumerProgressFactory is an autogenerated mock type for the ConsumerProgressFactory type +type ConsumerProgressFactory struct { + mock.Mock +} + +// InitConsumer provides a mock function with given fields: defaultIndex +func (_m *ConsumerProgressFactory) InitConsumer(defaultIndex uint64) (storage.ConsumerProgress, error) { + ret := _m.Called(defaultIndex) + + if len(ret) == 0 { + panic("no return value specified for InitConsumer") + } + + var r0 storage.ConsumerProgress + var r1 error + if rf, ok := ret.Get(0).(func(uint64) (storage.ConsumerProgress, error)); ok { + return rf(defaultIndex) + } + if rf, ok := ret.Get(0).(func(uint64) storage.ConsumerProgress); ok { + r0 = rf(defaultIndex) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage.ConsumerProgress) + } + } + + if rf, ok := ret.Get(1).(func(uint64) error); ok { + r1 = rf(defaultIndex) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewConsumerProgressFactory creates a new instance of ConsumerProgressFactory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewConsumerProgressFactory(t interface { + mock.TestingT + Cleanup(func()) +}) *ConsumerProgressFactory { + mock := &ConsumerProgressFactory{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/mock/reader.go b/storage/mock/reader.go new file mode 100644 index 00000000000..10ece27981f --- /dev/null +++ b/storage/mock/reader.go @@ -0,0 +1,67 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + io "io" + + mock "github.com/stretchr/testify/mock" +) + +// Reader is an autogenerated mock type for the Reader type +type Reader struct { + mock.Mock +} + +// Get provides a mock function with given fields: key +func (_m *Reader) Get(key []byte) ([]byte, io.Closer, error) { + ret := _m.Called(key) + + if len(ret) == 0 { + panic("no return value specified for Get") + } + + var r0 []byte + var r1 io.Closer + var r2 error + if rf, ok := ret.Get(0).(func([]byte) ([]byte, io.Closer, error)); ok { + return rf(key) + } + if rf, ok := ret.Get(0).(func([]byte) []byte); ok { + r0 = rf(key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func([]byte) io.Closer); ok { + r1 = rf(key) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(io.Closer) + } + } + + if rf, ok := ret.Get(2).(func([]byte) error); ok { + r2 = rf(key) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// NewReader creates a new instance of Reader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReader(t interface { + mock.TestingT + Cleanup(func()) +}) *Reader { + mock := &Reader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/mock/writer.go b/storage/mock/writer.go new file mode 100644 index 00000000000..2bc10ec5493 --- /dev/null +++ b/storage/mock/writer.go @@ -0,0 +1,60 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import mock "github.com/stretchr/testify/mock" + +// Writer is an autogenerated mock type for the Writer type +type Writer struct { + mock.Mock +} + +// Delete provides a mock function with given fields: key +func (_m *Writer) Delete(key []byte) error { + ret := _m.Called(key) + + if len(ret) == 0 { + panic("no return value specified for Delete") + } + + var r0 error + if rf, ok := ret.Get(0).(func([]byte) error); ok { + r0 = rf(key) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Set provides a mock function with given fields: k, v +func (_m *Writer) Set(k []byte, v []byte) error { + ret := _m.Called(k, v) + + if len(ret) == 0 { + panic("no return value specified for Set") + } + + var r0 error + if rf, ok := ret.Get(0).(func([]byte, []byte) error); ok { + r0 = rf(k, v) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewWriter creates a new instance of Writer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *Writer { + mock := &Writer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/pebble/consumer_progress.go b/storage/pebble/consumer_progress.go index 37448bb4b5f..ae4d0fd54ce 100644 --- a/storage/pebble/consumer_progress.go +++ b/storage/pebble/consumer_progress.go @@ -5,15 +5,60 @@ import ( "github.com/cockroachdb/pebble" + "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/pebble/operation" ) +// ConsumerProgressFactory is a factory to create consumer progress +// It ensures the consumer progress is initialized before being used +type ConsumerProgressFactory struct { + db *pebble.DB + consumer string +} + +var _ storage.ConsumerProgressFactory = (*ConsumerProgressFactory)(nil) + +func NewConsumerProgressFactory(db *pebble.DB, consumer string) *ConsumerProgressFactory { + return &ConsumerProgressFactory{ + db: db, + consumer: consumer, + } +} + +// InitConsumer inserts the default processed index to the storage layer to initialize the +// consumer if it has not been initialized before, and returns the initalized consumer progress. +// It should not be called concurrently +func (cpf *ConsumerProgressFactory) InitConsumer(defaultIndex uint64) (storage.ConsumerProgress, error) { + consumer := newConsumerProgress(cpf.db, cpf.consumer) + + _, err := consumer.ProcessedIndex() + if err == nil { + // the consumer progress factory has been initialized + return consumer, nil + } + + if !storage.IsNotFound(err) { + return nil, fmt.Errorf("could not check if consumer progress is initted for consumer %v: %w", + consumer, err) + } + + // never initialized, initialize now + err = operation.SetProcessedIndex(cpf.consumer, defaultIndex)(cpf.db) + if err != nil { + return nil, fmt.Errorf("could not init consumer progress for consumer %v: %w", cpf.consumer, err) + } + + return consumer, nil +} + type ConsumerProgress struct { db *pebble.DB consumer string // to distinguish the consume progress between different consumers } -func NewConsumerProgress(db *pebble.DB, consumer string) *ConsumerProgress { +var _ storage.ConsumerProgress = (*ConsumerProgress)(nil) + +func newConsumerProgress(db *pebble.DB, consumer string) *ConsumerProgress { return &ConsumerProgress{ db: db, consumer: consumer, @@ -29,17 +74,6 @@ func (cp *ConsumerProgress) ProcessedIndex() (uint64, error) { return processed, nil } -// InitProcessedIndex insert the default processed index to the storage layer, can only be done once. -// initialize for the second time will return storage.ErrAlreadyExists -func (cp *ConsumerProgress) InitProcessedIndex(defaultIndex uint64) error { - err := operation.InsertProcessedIndex(cp.consumer, defaultIndex)(cp.db) - if err != nil { - return fmt.Errorf("could not update processed index: %w", err) - } - - return nil -} - func (cp *ConsumerProgress) SetProcessedIndex(processed uint64) error { err := operation.SetProcessedIndex(cp.consumer, processed)(cp.db) if err != nil {