Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/move sync pivot to IBlockTree. #8203

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Blockchain.Visitors;
using Nethermind.Consensus.Processing;
using Nethermind.Core;
Expand Down Expand Up @@ -71,7 +70,7 @@ public async Task Deletes_everything_after_the_missing_level()
.WithDatabaseFrom(builder)
.TestObject;

StartupBlockTreeFixer fixer = new(new SyncConfig(), tree, Substitute.For<IStateReader>(), LimboNoErrorLogger.Instance);
StartupBlockTreeFixer fixer = new(tree, Substitute.For<IStateReader>(), LimboNoErrorLogger.Instance);
await tree.Accept(fixer, CancellationToken.None);

Assert.That(blockInfosDb.Get(3), Is.Null, "level 3");
Expand Down Expand Up @@ -109,7 +108,7 @@ public async Task Suggesting_blocks_works_correctly_after_processor_restart(int
testRpc.BlockchainProcessor = newBlockchainProcessor;

// fixing after restart
StartupBlockTreeFixer fixer = new(new SyncConfig(), tree, testRpc.StateReader, LimboNoErrorLogger.Instance, 5);
StartupBlockTreeFixer fixer = new(tree, testRpc.StateReader, LimboNoErrorLogger.Instance, 5);
await tree.Accept(fixer, CancellationToken.None);

// waiting for N new heads
Expand Down Expand Up @@ -143,7 +142,7 @@ public async Task Fixer_should_not_suggest_block_without_state(int suggestedBloc
testRpc.BlockchainProcessor = newBlockchainProcessor;

// we create a new empty db for stateDb so we shouldn't suggest new blocks
IBlockTreeVisitor fixer = new StartupBlockTreeFixer(new SyncConfig(), tree, Substitute.For<IStateReader>(), LimboNoErrorLogger.Instance, 5);
IBlockTreeVisitor fixer = new StartupBlockTreeFixer(tree, Substitute.For<IStateReader>(), LimboNoErrorLogger.Instance, 5);
BlockVisitOutcome result = await fixer.VisitBlock(tree.Head!, CancellationToken.None);

Assert.That(result, Is.EqualTo(BlockVisitOutcome.None));
Expand All @@ -164,7 +163,7 @@ public async Task Fixer_should_not_suggest_block_with_null_block()
newBlockchainProcessor.Start();
testRpc.BlockchainProcessor = newBlockchainProcessor;

IBlockTreeVisitor fixer = new StartupBlockTreeFixer(new SyncConfig(), tree, testRpc.StateReader, LimboNoErrorLogger.Instance, 5);
IBlockTreeVisitor fixer = new StartupBlockTreeFixer(tree, testRpc.StateReader, LimboNoErrorLogger.Instance, 5);
BlockVisitOutcome result = await fixer.VisitBlock(null!, CancellationToken.None);

Assert.That(result, Is.EqualTo(BlockVisitOutcome.None));
Expand Down Expand Up @@ -211,7 +210,7 @@ public async Task When_head_block_is_followed_by_a_block_bodies_gap_it_should_de

tree.UpdateMainChain(block2);

StartupBlockTreeFixer fixer = new(new SyncConfig(), tree, Substitute.For<IStateReader>(), LimboNoErrorLogger.Instance);
StartupBlockTreeFixer fixer = new(tree, Substitute.For<IStateReader>(), LimboNoErrorLogger.Instance);
await tree.Accept(fixer, CancellationToken.None);

Assert.That(blockInfosDb.Get(3), Is.Null, "level 3");
Expand Down
37 changes: 35 additions & 2 deletions src/Nethermind/Nethermind.Blockchain/BlockTree.Initializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
using System.IO;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Db;
using Nethermind.Serialization.Json;
using Nethermind.Serialization.Rlp;

namespace Nethermind.Blockchain;
Expand All @@ -14,6 +16,37 @@ public partial class BlockTree
{
private bool _tryToRecoverFromHeaderBelowBodyCorruption = false;

private void LoadSyncPivot()
{
try
{
if (_metadataDb.KeyExists(MetadataDbKeys.UpdatedPivotData))
{
byte[]? pivotFromDb = _metadataDb.Get(MetadataDbKeys.UpdatedPivotData);
RlpStream pivotStream = new(pivotFromDb!);
long updatedPivotBlockNumber = pivotStream.DecodeLong();
Hash256 updatedPivotBlockHash = pivotStream.DecodeKeccak()!;

if (updatedPivotBlockHash is not null && !updatedPivotBlockHash.IsZero)
{
SyncPivot = (updatedPivotBlockNumber, updatedPivotBlockHash);
WasInitialSyncPivotSet = true;
if (_logger.IsInfo) _logger.Info($"Pivot block has been set based on data from db. Pivot block number: {updatedPivotBlockNumber}, hash: {updatedPivotBlockHash}");
return;
}
}
}
catch (RlpException)
{
if (_logger.IsWarn) _logger.Warn($"Cannot decode pivot block number or hash");
}

SyncPivot = (
LongConverter.FromString(_syncConfig.PivotNumber),
_syncConfig.PivotHash is null ? null : new Hash256(Bytes.FromHexString(_syncConfig.PivotHash)));
}


public void RecalculateTreeLevels()
{
LoadLowestInsertedHeader();
Expand Down Expand Up @@ -136,7 +169,7 @@ private void LoadLowestInsertedHeader()
{
// Old style binary search.
long left = 1L;
long right = _syncConfig.PivotNumberParsed;
long right = SyncPivot.BlockNumber;

LowestInsertedHeader = BinarySearchBlockHeader(left, right, LevelExists, BinarySearchDirection.Down);
}
Expand All @@ -147,7 +180,7 @@ private void LoadLowestInsertedHeader()
private void LoadBestKnown()
{
long left = (Head?.Number ?? 0) == 0
? Math.Max(_syncConfig.PivotNumberParsed, LowestInsertedHeader?.Number ?? 0) - 1
? Math.Max(SyncPivot.BlockNumber, LowestInsertedHeader?.Number ?? 0) - 1
: Head.Number;

long right = Math.Max(0, left) + BestKnownSearchLimit;
Expand Down
32 changes: 31 additions & 1 deletion src/Nethermind/Nethermind.Blockchain/BlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
Expand All @@ -20,7 +21,6 @@
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Core.Specs;
using Nethermind.Core.Threading;
using Nethermind.Crypto;
using Nethermind.Db;
using Nethermind.Int256;
Expand Down Expand Up @@ -96,6 +96,10 @@ public BlockHeader? LowestInsertedBeaconHeader

public long BestKnownBeaconNumber { get; private set; }

public (long BlockNumber, Hash256 BlockHash) SyncPivot { get; private set; } = (0, null);

public bool WasInitialSyncPivotSet { get; private set; }

public ulong NetworkId => _specProvider.NetworkId;

public ulong ChainId => _specProvider.ChainId;
Expand Down Expand Up @@ -129,6 +133,8 @@ public BlockTree(
_chainLevelInfoRepository = chainLevelInfoRepository ??
throw new ArgumentNullException(nameof(chainLevelInfoRepository));

LoadSyncPivot();

byte[]? deletePointer = _blockInfoDb.Get(DeletePointerAddressInDb);
if (deletePointer is not null)
{
Expand Down Expand Up @@ -1558,5 +1564,29 @@ public void ForkChoiceUpdated(Hash256? finalizedBlockHash, Hash256? safeBlockHas
_metadataDb.Set(MetadataDbKeys.SafeBlockHash, Rlp.Encode(SafeHash!).Bytes);
}
}

public void UpdateSyncPivot((long blockNumber, Hash256 blockHash) syncPivot, IBlockTree.SyncPivotUpdateReason reason)
{
if (reason == IBlockTree.SyncPivotUpdateReason.PivotUpdator)
{
if (WasInitialSyncPivotSet) throw new InvalidOperationException("Attempted to update sync pivot from pivot updater when sync pivot was already set.");
WasInitialSyncPivotSet = true;
DoUpdateSyncPivot(syncPivot.blockNumber, syncPivot.blockHash);
}
else
{
throw new UnreachableException();
}
}

private void DoUpdateSyncPivot(long blockNumber, Hash256 blockHash)
{
RlpStream pivotData = new(38); //1 byte (prefix) + 4 bytes (long) + 1 byte (prefix) + 32 bytes (Keccak)
pivotData.Encode(blockNumber);
pivotData.Encode(blockHash);
_metadataDb.Set(MetadataDbKeys.UpdatedPivotData, pivotData.Data.ToArray()!);

SyncPivot = (blockNumber, blockHash);
}
}
}
8 changes: 8 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ public BlockHeader? LowestInsertedBeaconHeader

public long BestKnownNumber => Math.Max(_overlayTree.BestKnownNumber, _baseTree.BestKnownNumber);
public long BestKnownBeaconNumber => Math.Max(_overlayTree.BestKnownBeaconNumber, _baseTree.BestKnownBeaconNumber);
public (long BlockNumber, Hash256 BlockHash) SyncPivot => _baseTree.SyncPivot;
public bool WasInitialSyncPivotSet => _baseTree.WasInitialSyncPivotSet;
public void UpdateSyncPivot((long blockNumber, Hash256 blockHash) syncPivot,
IBlockTree.SyncPivotUpdateReason reason)
{
// Don't update it here.
}

public Hash256 HeadHash => _overlayTree.HeadHash ?? _baseTree.HeadHash;
public Hash256 GenesisHash => _baseTree.GenesisHash;
public Hash256? PendingHash => _overlayTree.PendingHash ?? _baseTree.PendingHash;
Expand Down
24 changes: 24 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/IBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,29 @@ AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOption
void UpdateBeaconMainChain(BlockInfo[]? blockInfos, long clearBeaconMainChainStartPoint);

void RecalculateTreeLevels();

/// <summary>
/// Sync pivot is the point at which forward sync start. That was the original start point.
/// After 4444 its more like a delayed-finalized-block, or a forward-genesis-block
/// This is important as the blocktree initialization assume that all block between the sync pivot and
/// `long.MaxValue` exist. Additionally, it makes for a useful trigger for aux cleanup as when
/// it move (not yet) you can arbitrarily remove (or add) any blocks before it.
/// </summary>
(long BlockNumber, Hash256 BlockHash) SyncPivot { get; }
bool WasInitialSyncPivotSet { get; }

/// For sync pivot to be moved, there are two case:
/// 1. Initial setting of SyncPivot. There is one restriction for this case:
/// 1. the node is not synced yet.
/// 2. Moving of SyncPivot when a finialized block was reached. This has some restriction:
/// 1. All block between current sync pivot and new sync pivot must be processed.
/// 2. Last persisted state (which is also the starting HEAD) must be after the current sync pivot
/// and the new sync pivot.
void UpdateSyncPivot((long blockNumber, Hash256 blockHash) syncPivot, SyncPivotUpdateReason reason);

enum SyncPivotUpdateReason
{
PivotUpdator
}
}
}
8 changes: 8 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Int256;

namespace Nethermind.Blockchain
{
Expand Down Expand Up @@ -52,6 +53,13 @@ public BlockHeader? LowestInsertedBeaconHeader
public Block BestSuggestedBody => _wrapped.BestSuggestedBody;
public long BestKnownNumber => _wrapped.BestKnownNumber;
public long BestKnownBeaconNumber => _wrapped.BestKnownBeaconNumber;
public (long BlockNumber, Hash256 BlockHash) SyncPivot => _wrapped.SyncPivot;
public bool WasInitialSyncPivotSet => _wrapped.WasInitialSyncPivotSet;
public void UpdateSyncPivot((long blockNumber, Hash256 blockHash) syncPivot,
IBlockTree.SyncPivotUpdateReason reason)
{
}

public Block Head => _wrapped.Head;
public void MarkChainAsProcessed(IReadOnlyList<Block> blocks) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(MarkChainAsProcessed)} calls");
public (BlockInfo Info, ChainLevelInfo Level) GetInfo(long number, Hash256 blockHash) => _wrapped.GetInfo(number, blockHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@

using System;
using Nethermind.Config;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Db;
using Nethermind.Int256;
using Nethermind.Serialization.Json;

namespace Nethermind.Blockchain.Synchronization;

Expand Down Expand Up @@ -53,15 +50,9 @@ public interface ISyncConfig : IConfig
[ConfigItem(Description = "The hash of the pivot block for the Fast sync mode.", DefaultValue = "null")]
string? PivotHash { get; set; }

[ConfigItem(DisabledForCli = true, HiddenFromDocs = true, DefaultValue = "0")]
long PivotNumberParsed => LongConverter.FromString(PivotNumber);

[ConfigItem(DisabledForCli = true, HiddenFromDocs = true, DefaultValue = "0")]
UInt256 PivotTotalDifficultyParsed => UInt256.Parse(PivotTotalDifficulty ?? "0");

[ConfigItem(DisabledForCli = true, HiddenFromDocs = true)]
Hash256? PivotHashParsed => PivotHash is null ? null : new Hash256(Bytes.FromHexString(PivotHash));

[ConfigItem(Description = "The max number of attempts to update the pivot block based on the FCU message from the consensus client.", DefaultValue = "2147483647")]
int MaxAttemptsToUpdatePivot { get; set; }

Expand All @@ -76,9 +67,6 @@ public interface ISyncConfig : IConfig
DefaultValue = "0")]
public long AncientBodiesBarrier { get; set; }

[ConfigItem(DisabledForCli = true, HiddenFromDocs = true, DefaultValue = "1")]
public long AncientBodiesBarrierCalc => Math.Max(1, Math.Min(PivotNumberParsed, AncientBodiesBarrier));

[ConfigItem(Description = $$"""
The earliest receipt downloaded with fast sync when `{{nameof(DownloadReceiptsInFastSync)}}` is set to `true`. The actual value is determined as follows:

Expand All @@ -90,9 +78,6 @@ public interface ISyncConfig : IConfig
DefaultValue = "0")]
public long AncientReceiptsBarrier { get; set; }

[ConfigItem(DisabledForCli = true, HiddenFromDocs = true, DefaultValue = "1")]
public long AncientReceiptsBarrierCalc => Math.Max(1, Math.Min(PivotNumberParsed, Math.Max(AncientBodiesBarrier, AncientReceiptsBarrier)));

[ConfigItem(Description = "Whether to use the Snap sync mode.", DefaultValue = "false")]
public bool SnapSync { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public bool SynchronizationEnabled
private string _pivotNumber = "0";
public string PivotNumber
{
get => FastSync || SnapSync ? _pivotNumber : "0";
get => FastSync ? _pivotNumber : "0";
set => _pivotNumber = value;
}

private string? _pivotHash;
public string? PivotHash
{
get => FastSync || SnapSync ? _pivotHash : null;
get => FastSync ? _pivotHash : null;
set => _pivotHash = value;
}
public int MaxAttemptsToUpdatePivot { get; set; } = int.MaxValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class StartupBlockTreeFixer : IBlockTreeVisitor, IDisposable
private readonly BlockTreeSuggestPacer _blockTreeSuggestPacer;

public StartupBlockTreeFixer(
ISyncConfig syncConfig,
IBlockTree blockTree,
IStateReader stateReader,
ILogger logger,
Expand All @@ -50,7 +49,7 @@ public StartupBlockTreeFixer(
_logger = logger;

long assumedHead = _blockTree.Head?.Number ?? 0;
_startNumber = Math.Max(syncConfig.PivotNumberParsed, assumedHead + 1);
_startNumber = Math.Max(_blockTree.SyncPivot.BlockNumber, assumedHead + 1);
_blocksToLoad = (assumedHead + 1) >= _startNumber ? (_blockTree.BestKnownNumber - _startNumber + 1) : 0;

_currentLevelNumber = _startNumber - 1; // because we always increment on entering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void Beyond_capacity_lru_check()
public void Beyond_capacity_lru_parallel()
{
LruKeyCache<AddressAsKey> cache = new(Capacity, Capacity / 2, "test");
Parallel.For(0, Environment.ProcessorCount * 8, (iter) =>
Parallel.For(0, Math.Min(Environment.ProcessorCount * 8, 64), (iter) =>
{
for (int ii = 0; ii < Capacity; ii++)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class BlockProcessingModule : Module
protected override void Load(ContainerBuilder builder)
{
builder
.AddSingleton<IPoSSwitcher>(NoPoS.Instance)
.AddSingleton<IBlockValidator, BlockValidator>()
.AddSingleton<ITxValidator, ISpecProvider>((spec) => new TxValidator(spec.ChainId))
.AddSingleton<IHeaderValidator, HeaderValidator>()
Expand Down
13 changes: 6 additions & 7 deletions src/Nethermind/Nethermind.Facade.Test/Eth/EthSyncingInfoTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
using System.Threading;
using FluentAssertions;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Blocks;
using Nethermind.Blockchain.Receipts;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core.Crypto;
using Nethermind.Core.Test.Builders;
using Nethermind.Facade.Eth;
using Nethermind.Logging;
Expand Down Expand Up @@ -90,16 +89,16 @@ public void IsSyncing_AncientBarriers(bool resolverDownloadingBodies,
ISyncConfig syncConfig = new SyncConfig
{
FastSync = true,
AncientBodiesBarrier = 800,
// AncientBodiesBarrierCalc = Max(1, Min(Pivot, BodiesBarrier)) = BodiesBarrier = 800
AncientReceiptsBarrier = 900,
// AncientReceiptsBarrierCalc = Max(1, Min(Pivot, Max(BodiesBarrier, ReceiptsBarrier))) = ReceiptsBarrier = 900
DownloadBodiesInFastSync = true,
DownloadReceiptsInFastSync = true,
PivotNumber = "1000"
};
IBlockTree blockTree = Substitute.For<IBlockTree>();
blockTree.SyncPivot.Returns((1000, Hash256.Zero));

ISyncPointers syncPointers = Substitute.For<ISyncPointers>();
syncPointers.AncientBodiesBarrier.Returns(800);
syncPointers.AncientReceiptsBarrier.Returns(900);

ISyncProgressResolver syncProgressResolver = Substitute.For<ISyncProgressResolver>();
syncProgressResolver.IsFastBlocksBodiesFinished().Returns(resolverDownloadingBodies);
syncProgressResolver.IsFastBlocksReceiptsFinished().Returns(resolverDownloadingreceipts);
Expand Down
Loading