Skip to content

Commit

Permalink
refactor to use consumer factory
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Aug 23, 2024
1 parent 166df6e commit 9d87c56
Show file tree
Hide file tree
Showing 21 changed files with 95 additions and 61 deletions.
20 changes: 10 additions & 10 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
var processedBlockHeight storage.ConsumerProgressFactory
var processedNotifications storage.ConsumerProgressFactory
var bsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -586,19 +586,19 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeight = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
} else {
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeight = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
}
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
processedNotifications = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
} else {
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
processedNotifications = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
}
return nil
}).
Expand Down Expand Up @@ -826,15 +826,15 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

if builder.executionDataIndexingEnabled {
var indexedBlockHeight storage.ConsumerProgress
var indexedBlockHeight storage.ConsumerProgressFactory

builder.
AdminCommand("execute-script", func(config *cmd.NodeConfig) commands.AdminCommand {
return stateSyncCommands.NewExecuteScriptCommand(builder.ScriptExecutor)
}).
Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1579,7 +1579,7 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedBlockHeight storage.ConsumerProgress
var processedBlockHeight storage.ConsumerProgressFactory

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
Expand Down Expand Up @@ -1769,7 +1769,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
processedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
processedBlockHeight = bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
return nil
}).
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
Expand Down
16 changes: 8 additions & 8 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,8 +1107,8 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) {
func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder {
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
var processedBlockHeight storage.ConsumerProgressFactory
var processedNotifications storage.ConsumerProgressFactory
var publicBsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -1163,19 +1163,19 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeight = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
} else {
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeight = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
}
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
processedNotifications = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
} else {
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
processedNotifications = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
}
return nil
}).
Expand Down Expand Up @@ -1361,11 +1361,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return builder.ExecutionDataPruner, nil
})
if builder.executionDataIndexingEnabled {
var indexedBlockHeight storage.ConsumerProgress
var indexedBlockHeight storage.ConsumerProgressFactory

builder.Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
Expand Down
14 changes: 7 additions & 7 deletions cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
var (
followerState protocol.FollowerState

chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndex *badger.ConsumerProgress // used in chunk consumer
processedBlockHeight *badger.ConsumerProgress // used in block consumer
chunkQueue *badger.ChunksQueue // used in chunk consumer
chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndex *badger.ConsumerProgressFactory // used in chunk consumer
processedBlockHeight *badger.ConsumerProgressFactory // used in block consumer
chunkQueue *badger.ChunksQueue // used in chunk consumer

syncCore *chainsync.Core // used in follower engine
assignerEngine *assigner.Engine // the assigner engine
Expand Down Expand Up @@ -155,11 +155,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
return nil
}).
Module("processed chunk index consumer progress", func(node *NodeConfig) error {
processedChunkIndex = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationChunkIndex)
processedChunkIndex = badger.NewConsumerProgressFactory(node.DB, module.ConsumeProgressVerificationChunkIndex)
return nil
}).
Module("processed block height consumer progress", func(node *NodeConfig) error {
processedBlockHeight = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationBlockHeight)
processedBlockHeight = badger.NewConsumerProgressFactory(node.DB, module.ConsumeProgressVerificationBlockHeight)
return nil
}).
Module("chunks queue", func(node *NodeConfig) error {
Expand Down
10 changes: 5 additions & 5 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (suite *Suite) TestGetSealedTransaction() {
require.NoError(suite.T(), err)

// create the ingest engine
processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressIngestionEngineBlockHeight)

ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections,
transactions, results, receipts, collectionExecutedMetric, processedHeight, lastFullBlockHeight)
Expand Down Expand Up @@ -840,9 +840,9 @@ func (suite *Suite) TestGetTransactionResult() {
)
require.NoError(suite.T(), err)

processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressIngestionEngineBlockHeight)
lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
require.NoError(suite.T(), err)
Expand Down Expand Up @@ -1070,9 +1070,9 @@ func (suite *Suite) TestExecuteScript() {
suite.net.On("Register", channels.ReceiveReceipts, mock.Anything).Return(conduit, nil).
Once()

processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressIngestionEngineBlockHeight)
lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
require.NoError(suite.T(), err)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func New(
executionResults storage.ExecutionResults,
executionReceipts storage.ExecutionReceipts,
collectionExecutedMetric module.CollectionExecutedMetric,
processedHeight storage.ConsumerProgress,
processedHeight storage.ConsumerProgressFactory,
lastFullBlockHeight *counters.PersistentStrictMonotonicCounter,
) (*Engine, error) {
executionReceiptsRawQueue, err := fifoqueue.NewFifoQueue(defaultQueueCapacity)
Expand Down
4 changes: 2 additions & 2 deletions engine/access/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ func (s *Suite) SetupTest() {

// initIngestionEngine create new instance of ingestion engine and waits when it starts
func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine {
processedHeight := bstorage.NewConsumerProgress(s.db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight := bstorage.NewConsumerProgressFactory(s.db, module.ConsumeProgressIngestionEngineBlockHeight)

var err error
s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(s.db, module.ConsumeProgressLastFullBlockHeight),
s.finalizedBlock.Height,
)
require.NoError(s.T(), err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (s *TransactionStatusSuite) SetupTest() {

var err error
s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(s.db, module.ConsumeProgressLastFullBlockHeight),
s.rootBlock.Header.Height,
)
require.NoError(s.T(), err)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (suite *Suite) SetupTest() {

suite.db, suite.dbDir = unittest.TempBadgerDB(suite.T())
suite.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(suite.db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(suite.db, module.ConsumeProgressLastFullBlockHeight),
0,
)
suite.Require().NoError(err)
Expand Down
4 changes: 2 additions & 2 deletions engine/testutil/mock/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ type VerificationNode struct {
Receipts storage.ExecutionReceipts

// chunk consumer and processor for fetcher engine
ProcessedChunkIndex storage.ConsumerProgress
ProcessedChunkIndex storage.ConsumerProgressFactory
ChunksQueue *bstorage.ChunksQueue
ChunkConsumer *chunkconsumer.ChunkConsumer

// block consumer for chunk consumer
ProcessedBlockHeight storage.ConsumerProgress
ProcessedBlockHeight storage.ConsumerProgressFactory
BlockConsumer *blockconsumer.BlockConsumer

VerifierEngine *verifier.Engine
Expand Down
4 changes: 2 additions & 2 deletions engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ func VerificationNode(t testing.TB,
}

if node.ProcessedChunkIndex == nil {
node.ProcessedChunkIndex = storage.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationChunkIndex)
node.ProcessedChunkIndex = storage.NewConsumerProgressFactory(node.PublicDB, module.ConsumeProgressVerificationChunkIndex)
}

if node.ChunksQueue == nil {
Expand All @@ -1005,7 +1005,7 @@ func VerificationNode(t testing.TB,
}

if node.ProcessedBlockHeight == nil {
node.ProcessedBlockHeight = storage.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationBlockHeight)
node.ProcessedBlockHeight = storage.NewConsumerProgressFactory(node.PublicDB, module.ConsumeProgressVerificationBlockHeight)
}

if node.VerifierEngine == nil {
Expand Down
9 changes: 7 additions & 2 deletions engine/verification/assigner/blockconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func defaultProcessedIndex(state protocol.State) (uint64, error) {
// index for initializing the processed index in storage.
func NewBlockConsumer(log zerolog.Logger,
metrics module.VerificationMetrics,
processedHeight storage.ConsumerProgress,
processedHeightFactory storage.ConsumerProgressFactory,
blocks storage.Blocks,
state protocol.State,
blockProcessor assigner.FinalizedBlockProcessor,
Expand All @@ -63,7 +63,12 @@ func NewBlockConsumer(log zerolog.Logger,
return nil, 0, fmt.Errorf("could not read default processed index: %w", err)
}

consumer, err := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing, 0, defaultIndex)
processedHeight, err := processedHeightFactory.InitConsumer(defaultIndex)
if err != nil {
return nil, 0, fmt.Errorf("could not initialize processed height: %w", err)
}

consumer, err := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing, 0)
if err != nil {
return nil, 0, fmt.Errorf("could not create block consumer: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func withConsumer(
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
maxProcessing := uint64(workerCount)

processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressVerificationBlockHeight)
processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressVerificationBlockHeight)
collector := &metrics.NoopCollector{}
tracer := trace.NewNoopTracer()
log := unittest.Logger()
Expand Down
9 changes: 7 additions & 2 deletions engine/verification/fetcher/chunkconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ChunkConsumer struct {
func NewChunkConsumer(
log zerolog.Logger,
metrics module.VerificationMetrics,
processedIndex storage.ConsumerProgress, // to persist the processed index
processedIndexFactory storage.ConsumerProgressFactory, // to persist the processed index
chunksQueue storage.ChunksQueue, // to read jobs (chunks) from
chunkProcessor fetcher.AssignedChunkProcessor, // to process jobs (chunks)
maxProcessing uint64, // max number of jobs to be processed in parallel
Expand All @@ -39,8 +39,13 @@ func NewChunkConsumer(

jobs := &ChunkJobs{locators: chunksQueue}

processedIndex, err := processedIndexFactory.InitConsumer(DefaultJobIndex)
if err != nil {
return nil, fmt.Errorf("could not initialize processed index: %w", err)
}

lg := log.With().Str("module", "chunk_consumer").Logger()
consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing, 0, DefaultJobIndex)
consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing, 0)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions engine/verification/fetcher/chunkconsumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func WithConsumer(
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
maxProcessing := uint64(3)

processedIndex := storage.NewConsumerProgress(db, module.ConsumeProgressVerificationChunkIndex)
processedIndexFactory := storage.NewConsumerProgressFactory(db, module.ConsumeProgressVerificationChunkIndex)
chunksQueue := storage.NewChunkQueue(db)
ok, err := chunksQueue.Init(chunkconsumer.DefaultJobIndex)
require.NoError(t, err)
Expand All @@ -158,7 +158,7 @@ func WithConsumer(
consumer, err := chunkconsumer.NewChunkConsumer(
unittest.Logger(),
collector,
processedIndex,
processedIndexFactory,
chunksQueue,
engine,
maxProcessing,
Expand Down
4 changes: 2 additions & 2 deletions module/counters/persistent_strict_monotonic_counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestMonotonicConsumer(t *testing.T) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
var height1 = uint64(1234)
persistentStrictMonotonicCounter, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight),
height1,
)
require.NoError(t, err)
Expand All @@ -42,7 +42,7 @@ func TestMonotonicConsumer(t *testing.T) {

// check that new persistent strict monotonic counter has the same value
persistentStrictMonotonicCounter2, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight),
height1,
)
require.NoError(t, err)
Expand Down
9 changes: 7 additions & 2 deletions module/jobqueue/component_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ComponentConsumer struct {
func NewComponentConsumer(
log zerolog.Logger,
workSignal <-chan struct{},
progress storage.ConsumerProgress,
progressFactory storage.ConsumerProgressFactory,
jobs module.Jobs,
defaultIndex uint64,
processor JobProcessor, // method used to process jobs
Expand All @@ -48,7 +48,12 @@ func NewComponentConsumer(
maxProcessing,
)

consumer, err := NewConsumer(log, jobs, progress, worker, maxProcessing, maxSearchAhead, defaultIndex)
progress, err := progressFactory.InitConsumer(defaultIndex)
if err != nil {
return nil, err
}

consumer, err := NewConsumer(log, jobs, progress, worker, maxProcessing, maxSearchAhead)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 9d87c56

Please sign in to comment.