Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Badger Batch] Chunk Data Packs and ConsumeProgress #6391

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
var processedBlockHeight storage.ConsumerProgressFactory
var processedNotifications storage.ConsumerProgressFactory
var bsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -586,19 +586,19 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeight = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
} else {
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeight = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
}
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
processedNotifications = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
} else {
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
processedNotifications = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
}
return nil
}).
Expand Down Expand Up @@ -826,15 +826,15 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

if builder.executionDataIndexingEnabled {
var indexedBlockHeight storage.ConsumerProgress
var indexedBlockHeight storage.ConsumerProgressFactory

builder.
AdminCommand("execute-script", func(config *cmd.NodeConfig) commands.AdminCommand {
return stateSyncCommands.NewExecuteScriptCommand(builder.ScriptExecutor)
}).
Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1579,7 +1579,7 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
var processedBlockHeight storage.ConsumerProgress
var processedBlockHeight storage.ConsumerProgressFactory

if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
Expand Down Expand Up @@ -1769,15 +1769,15 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
processedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
processedBlockHeight = bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
return nil
}).
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
rootBlockHeight := node.State.Params().FinalizedRoot().Height

var err error
lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressLastFullBlockHeight),
rootBlockHeight,
)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,8 +1107,8 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) {
func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder {
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
var processedBlockHeight storage.ConsumerProgressFactory
var processedNotifications storage.ConsumerProgressFactory
var publicBsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
Expand Down Expand Up @@ -1163,19 +1163,19 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeight = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
} else {
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeight = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
}
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
processedNotifications = bstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
} else {
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
processedNotifications = pstorage.NewConsumerProgressFactory(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
}
return nil
}).
Expand Down Expand Up @@ -1361,11 +1361,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return builder.ExecutionDataPruner, nil
})
if builder.executionDataIndexingEnabled {
var indexedBlockHeight storage.ConsumerProgress
var indexedBlockHeight storage.ConsumerProgressFactory

builder.Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
indexedBlockHeight = bstorage.NewConsumerProgressFactory(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
Expand Down
14 changes: 7 additions & 7 deletions cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
var (
followerState protocol.FollowerState

chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndex *badger.ConsumerProgress // used in chunk consumer
processedBlockHeight *badger.ConsumerProgress // used in block consumer
chunkQueue *badger.ChunksQueue // used in chunk consumer
chunkStatuses *stdmap.ChunkStatuses // used in fetcher engine
chunkRequests *stdmap.ChunkRequests // used in requester engine
processedChunkIndex *badger.ConsumerProgressFactory // used in chunk consumer
processedBlockHeight *badger.ConsumerProgressFactory // used in block consumer
chunkQueue *badger.ChunksQueue // used in chunk consumer

syncCore *chainsync.Core // used in follower engine
assignerEngine *assigner.Engine // the assigner engine
Expand Down Expand Up @@ -155,11 +155,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
return nil
}).
Module("processed chunk index consumer progress", func(node *NodeConfig) error {
processedChunkIndex = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationChunkIndex)
processedChunkIndex = badger.NewConsumerProgressFactory(node.DB, module.ConsumeProgressVerificationChunkIndex)
return nil
}).
Module("processed block height consumer progress", func(node *NodeConfig) error {
processedBlockHeight = badger.NewConsumerProgress(node.DB, module.ConsumeProgressVerificationBlockHeight)
processedBlockHeight = badger.NewConsumerProgressFactory(node.DB, module.ConsumeProgressVerificationBlockHeight)
return nil
}).
Module("chunks queue", func(node *NodeConfig) error {
Expand Down
12 changes: 6 additions & 6 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,13 +678,13 @@ func (suite *Suite) TestGetSealedTransaction() {
require.NoError(suite.T(), err)

lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
require.NoError(suite.T(), err)

// create the ingest engine
processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressIngestionEngineBlockHeight)

ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections,
transactions, results, receipts, collectionExecutedMetric, processedHeight, lastFullBlockHeight)
Expand Down Expand Up @@ -840,9 +840,9 @@ func (suite *Suite) TestGetTransactionResult() {
)
require.NoError(suite.T(), err)

processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressIngestionEngineBlockHeight)
lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
require.NoError(suite.T(), err)
Expand Down Expand Up @@ -1070,9 +1070,9 @@ func (suite *Suite) TestExecuteScript() {
suite.net.On("Register", channels.ReceiveReceipts, mock.Anything).Return(conduit, nil).
Once()

processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressIngestionEngineBlockHeight)
lastFullBlockHeight, err := counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressLastFullBlockHeight),
suite.rootBlock.Height,
)
require.NoError(suite.T(), err)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func New(
executionResults storage.ExecutionResults,
executionReceipts storage.ExecutionReceipts,
collectionExecutedMetric module.CollectionExecutedMetric,
processedHeight storage.ConsumerProgress,
processedHeight storage.ConsumerProgressFactory,
lastFullBlockHeight *counters.PersistentStrictMonotonicCounter,
) (*Engine, error) {
executionReceiptsRawQueue, err := fifoqueue.NewFifoQueue(defaultQueueCapacity)
Expand Down
4 changes: 2 additions & 2 deletions engine/access/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ func (s *Suite) SetupTest() {

// initIngestionEngine create new instance of ingestion engine and waits when it starts
func (s *Suite) initIngestionEngine(ctx irrecoverable.SignalerContext) *Engine {
processedHeight := bstorage.NewConsumerProgress(s.db, module.ConsumeProgressIngestionEngineBlockHeight)
processedHeight := bstorage.NewConsumerProgressFactory(s.db, module.ConsumeProgressIngestionEngineBlockHeight)

var err error
s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(s.db, module.ConsumeProgressLastFullBlockHeight),
s.finalizedBlock.Height,
)
require.NoError(s.T(), err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (s *TransactionStatusSuite) SetupTest() {

var err error
s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(s.db, module.ConsumeProgressLastFullBlockHeight),
s.rootBlock.Header.Height,
)
require.NoError(s.T(), err)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (suite *Suite) SetupTest() {

suite.db, suite.dbDir = unittest.TempBadgerDB(suite.T())
suite.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(suite.db, module.ConsumeProgressLastFullBlockHeight),
bstorage.NewConsumerProgressFactory(suite.db, module.ConsumeProgressLastFullBlockHeight),
0,
)
suite.Require().NoError(err)
Expand Down
4 changes: 2 additions & 2 deletions engine/testutil/mock/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ type VerificationNode struct {
Receipts storage.ExecutionReceipts

// chunk consumer and processor for fetcher engine
ProcessedChunkIndex storage.ConsumerProgress
ProcessedChunkIndex storage.ConsumerProgressFactory
ChunksQueue *bstorage.ChunksQueue
ChunkConsumer *chunkconsumer.ChunkConsumer

// block consumer for chunk consumer
ProcessedBlockHeight storage.ConsumerProgress
ProcessedBlockHeight storage.ConsumerProgressFactory
BlockConsumer *blockconsumer.BlockConsumer

VerifierEngine *verifier.Engine
Expand Down
4 changes: 2 additions & 2 deletions engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ func VerificationNode(t testing.TB,
}

if node.ProcessedChunkIndex == nil {
node.ProcessedChunkIndex = storage.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationChunkIndex)
node.ProcessedChunkIndex = storage.NewConsumerProgressFactory(node.PublicDB, module.ConsumeProgressVerificationChunkIndex)
}

if node.ChunksQueue == nil {
Expand All @@ -1005,7 +1005,7 @@ func VerificationNode(t testing.TB,
}

if node.ProcessedBlockHeight == nil {
node.ProcessedBlockHeight = storage.NewConsumerProgress(node.PublicDB, module.ConsumeProgressVerificationBlockHeight)
node.ProcessedBlockHeight = storage.NewConsumerProgressFactory(node.PublicDB, module.ConsumeProgressVerificationBlockHeight)
}

if node.VerifierEngine == nil {
Expand Down
9 changes: 7 additions & 2 deletions engine/verification/assigner/blockconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func defaultProcessedIndex(state protocol.State) (uint64, error) {
// index for initializing the processed index in storage.
func NewBlockConsumer(log zerolog.Logger,
metrics module.VerificationMetrics,
processedHeight storage.ConsumerProgress,
processedHeightFactory storage.ConsumerProgressFactory,
blocks storage.Blocks,
state protocol.State,
blockProcessor assigner.FinalizedBlockProcessor,
Expand All @@ -63,7 +63,12 @@ func NewBlockConsumer(log zerolog.Logger,
return nil, 0, fmt.Errorf("could not read default processed index: %w", err)
}

consumer, err := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing, 0, defaultIndex)
processedHeight, err := processedHeightFactory.InitConsumer(defaultIndex)
if err != nil {
return nil, 0, fmt.Errorf("could not initialize processed height: %w", err)
}

consumer, err := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing, 0)
if err != nil {
return nil, 0, fmt.Errorf("could not create block consumer: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func withConsumer(
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
maxProcessing := uint64(workerCount)

processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressVerificationBlockHeight)
processedHeight := bstorage.NewConsumerProgressFactory(db, module.ConsumeProgressVerificationBlockHeight)
collector := &metrics.NoopCollector{}
tracer := trace.NewNoopTracer()
log := unittest.Logger()
Expand Down
9 changes: 7 additions & 2 deletions engine/verification/fetcher/chunkconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ChunkConsumer struct {
func NewChunkConsumer(
log zerolog.Logger,
metrics module.VerificationMetrics,
processedIndex storage.ConsumerProgress, // to persist the processed index
processedIndexFactory storage.ConsumerProgressFactory, // to persist the processed index
chunksQueue storage.ChunksQueue, // to read jobs (chunks) from
chunkProcessor fetcher.AssignedChunkProcessor, // to process jobs (chunks)
maxProcessing uint64, // max number of jobs to be processed in parallel
Expand All @@ -39,8 +39,13 @@ func NewChunkConsumer(

jobs := &ChunkJobs{locators: chunksQueue}

processedIndex, err := processedIndexFactory.InitConsumer(DefaultJobIndex)
if err != nil {
return nil, fmt.Errorf("could not initialize processed index: %w", err)
}

lg := log.With().Str("module", "chunk_consumer").Logger()
consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing, 0, DefaultJobIndex)
consumer, err := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing, 0)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions engine/verification/fetcher/chunkconsumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func WithConsumer(
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
maxProcessing := uint64(3)

processedIndex := storage.NewConsumerProgress(db, module.ConsumeProgressVerificationChunkIndex)
processedIndexFactory := storage.NewConsumerProgressFactory(db, module.ConsumeProgressVerificationChunkIndex)
chunksQueue := storage.NewChunkQueue(db)
ok, err := chunksQueue.Init(chunkconsumer.DefaultJobIndex)
require.NoError(t, err)
Expand All @@ -158,7 +158,7 @@ func WithConsumer(
consumer, err := chunkconsumer.NewChunkConsumer(
unittest.Logger(),
collector,
processedIndex,
processedIndexFactory,
chunksQueue,
engine,
maxProcessing,
Expand Down
Loading