-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Access] Add registerDB pruning module #6397
base: master
Are you sure you want to change the base?
Changes from 45 commits
5584146
fd5fa65
8aab5d9
08274d1
61d3be2
8910083
b863b61
fc3c44c
64e9c9b
177651c
33fe4a1
2d847ba
1018ba2
0e8cc4c
afe9e56
fec4ba2
b9247b6
97a046d
0e7f9e5
8080a15
b57bc4e
3ca7883
1ff10bc
25e1356
66e810b
0aada71
f039d15
b6c8040
db82c62
dc984cf
84a8f9a
bf15f92
919f03b
9ed6ca8
2cb1264
f040c83
fc4d6cb
87fe8fe
d70106a
f276544
7ffc1a4
66418af
ebcc13f
10cb2f2
23f1e36
d251445
5833735
c238bac
e41451c
74c4f97
c4bf9f3
d086c1b
16e7887
0f3b5aa
4961567
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -73,7 +73,7 @@ import ( | |||||
"github.com/onflow/flow-go/module/execution" | ||||||
"github.com/onflow/flow-go/module/executiondatasync/execution_data" | ||||||
execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" | ||||||
"github.com/onflow/flow-go/module/executiondatasync/pruner" | ||||||
edpruner "github.com/onflow/flow-go/module/executiondatasync/pruner" | ||||||
edstorage "github.com/onflow/flow-go/module/executiondatasync/storage" | ||||||
"github.com/onflow/flow-go/module/executiondatasync/tracker" | ||||||
finalizer "github.com/onflow/flow-go/module/finalizer/consensus" | ||||||
|
@@ -176,6 +176,9 @@ type AccessNodeConfig struct { | |||||
checkPayerBalanceMode string | ||||||
versionControlEnabled bool | ||||||
stopControlEnabled bool | ||||||
registerDBPruningEnabled bool | ||||||
registerDBPruneTickerInterval time.Duration | ||||||
registerDBPruneThrottleDelay time.Duration | ||||||
registerDBPruneThreshold uint64 | ||||||
} | ||||||
|
||||||
|
@@ -268,8 +271,8 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { | |||||
executionDataIndexingEnabled: false, | ||||||
executionDataDBMode: execution_data.ExecutionDataDBModeBadger.String(), | ||||||
executionDataPrunerHeightRangeTarget: 0, | ||||||
executionDataPrunerThreshold: pruner.DefaultThreshold, | ||||||
executionDataPruningInterval: pruner.DefaultPruningInterval, | ||||||
executionDataPrunerThreshold: edpruner.DefaultThreshold, | ||||||
executionDataPruningInterval: edpruner.DefaultPruningInterval, | ||||||
registersDBPath: filepath.Join(homedir, ".flow", "execution_state"), | ||||||
checkpointFile: cmd.NotSet, | ||||||
scriptExecutorConfig: query.NewDefaultConfig(), | ||||||
|
@@ -281,7 +284,10 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { | |||||
checkPayerBalanceMode: accessNode.Disabled.String(), | ||||||
versionControlEnabled: true, | ||||||
stopControlEnabled: false, | ||||||
registerDBPruneThreshold: pruner.DefaultThreshold, | ||||||
registerDBPruningEnabled: false, | ||||||
registerDBPruneTickerInterval: pstorage.DefaultPruneTickerInterval, | ||||||
registerDBPruneThrottleDelay: pstorage.DefaultPruneThrottleDelay, | ||||||
registerDBPruneThreshold: pstorage.DefaultPruneThreshold, | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -303,8 +309,9 @@ type FlowAccessNodeBuilder struct { | |||||
CollectionsToMarkExecuted *stdmap.Times | ||||||
BlocksToMarkExecuted *stdmap.Times | ||||||
TransactionMetrics *metrics.TransactionCollector | ||||||
TransactionValidationMetrics *metrics.TransactionValidationCollector | ||||||
RestMetrics *metrics.RestCollector | ||||||
RegisterDBPrunerMetrics *metrics.RegisterDBPrunerCollector | ||||||
TransactionValidationMetrics *metrics.TransactionValidationCollector | ||||||
AccessMetrics module.AccessMetrics | ||||||
PingMetrics module.PingMetrics | ||||||
Committee hotstuff.DynamicCommittee | ||||||
|
@@ -327,11 +334,13 @@ type FlowAccessNodeBuilder struct { | |||||
TxResultsIndex *index.TransactionResultsIndex | ||||||
IndexerDependencies *cmd.DependencyList | ||||||
collectionExecutedMetric module.CollectionExecutedMetric | ||||||
ExecutionDataPruner *pruner.Pruner | ||||||
ExecutionDataPruner *edpruner.Pruner | ||||||
ExecutionDatastoreManager edstorage.DatastoreManager | ||||||
ExecutionDataTracker tracker.Storage | ||||||
VersionControl *version.VersionControl | ||||||
StopControl *stop.StopControl | ||||||
RegisterDB *pebble.DB | ||||||
RegisterDBPrunerDependencies *cmd.DependencyList | ||||||
|
||||||
// The sync engine participants provider is the libp2p peer store for the access node | ||||||
// which is not available until after the network has started. | ||||||
|
@@ -550,6 +559,10 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess | |||||
requesterDependable := module.NewProxiedReadyDoneAware() | ||||||
builder.IndexerDependencies.Add(requesterDependable) | ||||||
|
||||||
// setup dependency chain to ensure register db pruner starts after the indexer | ||||||
indexerDependable := module.NewProxiedReadyDoneAware() | ||||||
builder.RegisterDBPrunerDependencies.Add(indexerDependable) | ||||||
|
||||||
executionDataPrunerEnabled := builder.executionDataPrunerHeightRangeTarget != 0 | ||||||
|
||||||
builder. | ||||||
|
@@ -779,16 +792,16 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess | |||||
} | ||||||
|
||||||
var err error | ||||||
builder.ExecutionDataPruner, err = pruner.NewPruner( | ||||||
builder.ExecutionDataPruner, err = edpruner.NewPruner( | ||||||
node.Logger, | ||||||
prunerMetrics, | ||||||
builder.ExecutionDataTracker, | ||||||
pruner.WithPruneCallback(func(ctx context.Context) error { | ||||||
edpruner.WithPruneCallback(func(ctx context.Context) error { | ||||||
return builder.ExecutionDatastoreManager.CollectGarbage(ctx) | ||||||
}), | ||||||
pruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget), | ||||||
pruner.WithThreshold(builder.executionDataPrunerThreshold), | ||||||
pruner.WithPruningInterval(builder.executionDataPruningInterval), | ||||||
edpruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget), | ||||||
edpruner.WithThreshold(builder.executionDataPrunerThreshold), | ||||||
edpruner.WithPruningInterval(builder.executionDataPruningInterval), | ||||||
) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("failed to create execution data pruner: %w", err) | ||||||
|
@@ -853,16 +866,17 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess | |||||
// Note: using a DependableComponent here to ensure that the indexer does not block | ||||||
// other components from starting while bootstrapping the register db since it may | ||||||
// take hours to complete. | ||||||
|
||||||
pdb, err := pstorage.OpenRegisterPebbleDB(builder.registersDBPath) | ||||||
var err error | ||||||
builder.RegisterDB, err = pstorage.OpenRegisterPebbleDB(builder.registersDBPath) | ||||||
builder.Logger.Warn().Msg(fmt.Sprintf("!!!!!!!!!! builder.registersDBPath: %s", builder.registersDBPath)) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("could not open registers db: %w", err) | ||||||
} | ||||||
builder.ShutdownFunc(func() error { | ||||||
return pdb.Close() | ||||||
return builder.RegisterDB.Close() | ||||||
}) | ||||||
|
||||||
bootstrapped, err := pstorage.IsBootstrapped(pdb) | ||||||
bootstrapped, err := pstorage.IsBootstrapped(builder.RegisterDB) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("could not check if registers db is bootstrapped: %w", err) | ||||||
} | ||||||
|
@@ -894,7 +908,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess | |||||
} | ||||||
|
||||||
rootHash := ledger.RootHash(builder.RootSeal.FinalState) | ||||||
bootstrap, err := pstorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger) | ||||||
bootstrap, err := pstorage.NewRegisterBootstrap(builder.RegisterDB, checkpointFile, checkpointHeight, rootHash, builder.Logger) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("could not create registers bootstrap: %w", err) | ||||||
} | ||||||
|
@@ -907,7 +921,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess | |||||
} | ||||||
} | ||||||
|
||||||
registers, err := pstorage.NewRegisters(pdb, builder.registerDBPruneThreshold) | ||||||
registers, err := pstorage.NewRegisters(builder.RegisterDB, builder.registerDBPruneThreshold) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("could not create registers storage: %w", err) | ||||||
} | ||||||
|
@@ -1005,8 +1019,31 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess | |||||
builder.StopControl.RegisterHeightRecorder(builder.ExecutionIndexer) | ||||||
} | ||||||
|
||||||
// add indexer into ReadyDoneAware dependency passed to pruner. This allows the register db pruner | ||||||
// to wait for the indexer to be ready before starting. | ||||||
indexerDependable.Init(builder.ExecutionIndexer) | ||||||
|
||||||
return builder.ExecutionIndexer, nil | ||||||
}, builder.IndexerDependencies) | ||||||
}, builder.IndexerDependencies). | ||||||
DependableComponent("register db pruner", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { | ||||||
if !builder.registerDBPruningEnabled { | ||||||
return &module.NoopReadyDoneAware{}, nil | ||||||
} | ||||||
|
||||||
registerDBPruner, err := pstorage.NewRegisterPruner( | ||||||
node.Logger, | ||||||
builder.RegisterDB, | ||||||
pstorage.WithPrunerMetrics(builder.RegisterDBPrunerMetrics), | ||||||
//pstorage.WithPruneThreshold(builder.registerDBPruneThreshold), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
pstorage.WithPruneThrottleDelay(builder.registerDBPruneThrottleDelay), | ||||||
pstorage.WithPruneTickerInterval(builder.registerDBPruneTickerInterval), | ||||||
) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("failed to create register db pruner: %w", err) | ||||||
} | ||||||
|
||||||
return registerDBPruner, nil | ||||||
}, builder.RegisterDBPrunerDependencies) | ||||||
} | ||||||
|
||||||
if builder.stateStreamConf.ListenAddr != "" { | ||||||
|
@@ -1133,10 +1170,11 @@ func FlowAccessNode(nodeBuilder *cmd.FlowNodeBuilder) *FlowAccessNodeBuilder { | |||||
dist := consensuspubsub.NewFollowerDistributor() | ||||||
dist.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger)) | ||||||
return &FlowAccessNodeBuilder{ | ||||||
AccessNodeConfig: DefaultAccessNodeConfig(), | ||||||
FlowNodeBuilder: nodeBuilder, | ||||||
FollowerDistributor: dist, | ||||||
IndexerDependencies: cmd.NewDependencyList(), | ||||||
AccessNodeConfig: DefaultAccessNodeConfig(), | ||||||
FlowNodeBuilder: nodeBuilder, | ||||||
FollowerDistributor: dist, | ||||||
IndexerDependencies: cmd.NewDependencyList(), | ||||||
RegisterDBPrunerDependencies: cmd.NewDependencyList(), | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -1313,6 +1351,8 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { | |||||
"execution-data-db", | ||||||
defaultConfig.executionDataDBMode, | ||||||
"[experimental] the DB type for execution datastore. One of [badger, pebble]") | ||||||
|
||||||
// Execution data pruner | ||||||
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, | ||||||
"execution-data-height-range-target", | ||||||
defaultConfig.executionDataPrunerHeightRangeTarget, | ||||||
|
@@ -1326,6 +1366,20 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { | |||||
defaultConfig.executionDataPruningInterval, | ||||||
"duration after which the pruner tries to prune execution data. The default value is 10 minutes") | ||||||
|
||||||
// RegisterDB pruning | ||||||
flags.BoolVar(&builder.registerDBPruningEnabled, | ||||||
"registerdb-pruning-enabled", | ||||||
defaultConfig.registerDBPruningEnabled, | ||||||
"whether to enable the pruning for register db") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
flags.DurationVar(&builder.registerDBPruneThrottleDelay, | ||||||
"registerdb-prune-throttle-delay", | ||||||
defaultConfig.registerDBPruneThrottleDelay, | ||||||
"delay for controlling a pause between batches of registers inspected and pruned") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
flags.DurationVar(&builder.registerDBPruneTickerInterval, | ||||||
"registerdb-prune-ticker-interval", | ||||||
defaultConfig.registerDBPruneTickerInterval, | ||||||
"duration after which the pruner tries to prune data. The default value is 10 minutes") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
// Execution State Streaming API | ||||||
flags.Uint32Var(&builder.stateStreamConf.ExecutionDataCacheSize, "execution-data-cache-size", defaultConfig.stateStreamConf.ExecutionDataCacheSize, "block execution data cache size") | ||||||
flags.Uint32Var(&builder.stateStreamConf.MaxGlobalStreams, "state-stream-global-max-streams", defaultConfig.stateStreamConf.MaxGlobalStreams, "global maximum number of concurrent streams") | ||||||
|
@@ -1703,12 +1757,17 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { | |||||
builder.RestMetrics = m | ||||||
return nil | ||||||
}). | ||||||
Module("register db metrics", func(node *cmd.NodeConfig) error { | ||||||
builder.RegisterDBPrunerMetrics = metrics.NewRegisterDBPrunerCollector() | ||||||
return nil | ||||||
}). | ||||||
Module("access metrics", func(node *cmd.NodeConfig) error { | ||||||
builder.AccessMetrics = metrics.NewAccessCollector( | ||||||
metrics.WithTransactionMetrics(builder.TransactionMetrics), | ||||||
metrics.WithTransactionValidationMetrics(builder.TransactionValidationMetrics), | ||||||
metrics.WithBackendScriptsMetrics(builder.TransactionMetrics), | ||||||
metrics.WithRestMetrics(builder.RestMetrics), | ||||||
metrics.WithRegisterDBPrunerMetrics(builder.RegisterDBPrunerMetrics), | ||||||
) | ||||||
return nil | ||||||
}). | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't forget to remove this