diff --git a/nimbus_verified_proxy/engine/blocks.nim b/nimbus_verified_proxy/engine/blocks.nim index acba403755..7ea483f9b5 100644 --- a/nimbus_verified_proxy/engine/blocks.nim +++ b/nimbus_verified_proxy/engine/blocks.nim @@ -8,7 +8,7 @@ {.push raises: [], gcsafe.} import - std/strutils, + std/[strutils, sequtils], results, chronicles, web3/[eth_api_types, eth_api], @@ -94,8 +94,6 @@ proc walkBlocks( sourceHash: Hash32, targetHash: Hash32, ): Future[EngineResult[void]] {.async: (raises: [CancelledError]).} = - var nextHash = sourceHash - info "Starting block walk to verify requested block", blockHash = targetHash let numBlocks = sourceNum - targetNum @@ -108,34 +106,59 @@ proc walkBlocks( ) ) - for i in 0 ..< numBlocks: - let nextHeader = - if engine.headerStore.contains(nextHash): - engine.headerStore.get(nextHash).get() - else: - let blk = ?(await engine.backend.eth_getBlockByHash(nextHash, false)) + var + nextHash = sourceHash # sourceHash is already the parent hash + nextNum = sourceNum - 1 + downloadedHeaders: Table[Hash32, Header] + futs: seq[Future[EngineResult[BlockObject]]] + + while nextNum > targetNum: + futs = @[] + downloadedHeaders.clear() + + while nextNum > targetNum and uint64(futs.len) < engine.parallelBlockDownloads: + if not engine.headerStore.contains(nextNum): + let tag = BlockTag(kind: bidNumber, number: Quantity(nextNum)) + futs.add(engine.backend.eth_getBlockByNumber(tag, false)) - trace "getting next block", - hash = nextHash, - number = blk.number, - remaining = distinctBase(blk.number) - targetNum + nextNum -= 1 - let header = convHeader(blk) + await allFutures(futs) - if header.computeBlockHash != nextHash: - return err( - ( - VerificationError, - "Encountered an invalid block header while walking the chain", + for futBlk in futs: + if futBlk.cancelled() or futBlk.failed(): + return + err((UnavailableDataError, "Error downloading a block during the block walk")) + else: + let + blk = ?futBlk.value() + h = convHeader(blk) + downloadedHeaders[blk.hash] = h + + for j in 0 ..< futs.len: + let unverifiedHeader = + if engine.headerStore.contains(nextHash): + engine.headerStore.get(nextHash).get() + else: + try: + downloadedHeaders[nextHash] + except KeyError: + return err( + (UnavailableDataError, "Cannot find downloaded block of the block walk") ) - ) - header + if unverifiedHeader.computeBlockHash != nextHash: + return err( + ( + VerificationError, + "Encountered an invalid block header while walking the chain", + ) + ) - if nextHeader.parentHash == targetHash: - return ok() + if unverifiedHeader.parentHash == targetHash: + return ok() - nextHash = nextHeader.parentHash + nextHash = unverifiedHeader.parentHash err((VerificationError, "the requested block is not part of the canonical chain")) @@ -148,18 +171,46 @@ proc verifyHeader( (VerificationError, "hashed block header doesn't match with blk.hash(downloaded)") ) - if not engine.headerStore.contains(hash): - let latestHeader = engine.headerStore.latest.valueOr: - return err( - (UnavailableDataError, "Couldn't get the latest header, syncing in progress") - ) + # if the header is available in the store just use that (already verified) + if engine.headerStore.contains(hash): + return ok() + # walk blocks backwards(time) from source to target + else: + let + earliest = engine.headerStore.earliest.valueOr: + return err( + (UnavailableDataError, "earliest block is not available yet. Still syncing?") + ) + finalized = engine.headerStore.finalized.valueOr: + return err( + (UnavailableDataError, "finalized block is not available yet. Still syncing?") + ) + latest = engine.headerStore.latest.valueOr: + return err( + (UnavailableDataError, "latest block is not available yet. Still syncing?") + ) - # walk blocks backwards(time) from source to target - ?( - await engine.walkBlocks( - latestHeader.number, header.number, latestHeader.parentHash, hash - ) - ) + # header is older than earliest + if header.number < earliest.number: + # earliest is finalized + if earliest.number < finalized.number: + ?await engine.walkBlocks( + earliest.number, header.number, earliest.parentHash, hash + ) + # earliest is not finalized (headerstore is smaller than 2 epochs or chain hasn't finalized for long) + else: + ?await engine.walkBlocks( + finalized.number, header.number, finalized.parentHash, hash + ) + # is within the boundaries of header store but not found in cache + else: + if header.number < finalized.number: + ?await engine.walkBlocks( + finalized.number, header.number, finalized.parentHash, hash + ) + else: + # optimistic walk + ?await engine.walkBlocks(latest.number, header.number, latest.parentHash, hash) ok() diff --git a/nimbus_verified_proxy/engine/engine.nim b/nimbus_verified_proxy/engine/engine.nim index 981fc53be8..ccb707a8f1 100644 --- a/nimbus_verified_proxy/engine/engine.nim +++ b/nimbus_verified_proxy/engine/engine.nim @@ -19,6 +19,7 @@ proc init*( accountsCache: AccountsCache.init(config.accountCacheLen), codeCache: CodeCache.init(config.codeCacheLen), storageCache: StorageCache.init(config.storageCacheLen), + parallelBlockDownloads: config.parallelBlockDownloads, ) engine.registerDefaultFrontend() diff --git a/nimbus_verified_proxy/engine/header_store.nim b/nimbus_verified_proxy/engine/header_store.nim index 059aa09829..37e719ea8c 100644 --- a/nimbus_verified_proxy/engine/header_store.nim +++ b/nimbus_verified_proxy/engine/header_store.nim @@ -134,8 +134,7 @@ func contains*(self: HeaderStore, number: base.BlockNumber): bool = self.hashes.contains(number) proc addHeader(self: HeaderStore, header: Header, hHash: Hash32) = - # Only add if it didn't exist before - the implementation of `latest` relies - # on this.. + # Only add if it didn't exist before if hHash notin self.headers: self.hashes.put(header.number, hHash) var flagEvicted = false @@ -154,6 +153,9 @@ proc addHeader(self: HeaderStore, header: Header, hHash: Hash32) = func updateFinalized*( self: HeaderStore, header: Header, hHash: Hash32 ): Result[void, string] = + # add header to the chain - if it already exists it won't be added + self.addHeader(header, hHash) + if self.finalized.isSome(): if self.finalized.get().number < header.number: self.finalized = Opt.some(header) diff --git a/nimbus_verified_proxy/engine/types.nim b/nimbus_verified_proxy/engine/types.nim index 0be08094b5..8c3ff2b924 100644 --- a/nimbus_verified_proxy/engine/types.nim +++ b/nimbus_verified_proxy/engine/types.nim @@ -225,6 +225,7 @@ type # config items chainId*: UInt256 maxBlockWalk*: uint64 + parallelBlockDownloads*: uint64 RpcVerificationEngineConf* = ref object chainId*: UInt256 @@ -233,3 +234,4 @@ type accountCacheLen*: int codeCacheLen*: int storageCacheLen*: int + parallelBlockDownloads*: uint64 diff --git a/nimbus_verified_proxy/libverifproxy/setup.nim b/nimbus_verified_proxy/libverifproxy/setup.nim index 2efc31e932..c0945655b2 100644 --- a/nimbus_verified_proxy/libverifproxy/setup.nim +++ b/nimbus_verified_proxy/libverifproxy/setup.nim @@ -59,7 +59,8 @@ proc load(T: type VerifiedProxyConf, configJson: string): T {.raises: [ProxyErro of "Json": StdoutLogKind.Json of "Auto": StdoutLogKind.Auto else: StdoutLogKind.None - maxBlockWalk = jsonNode.getOrDefault("maxBlockWalk").getInt(1000) + maxBlockWalk = jsonNode.getOrDefault("maxBlockWalk").getBiggestInt(1000) + prllBlkDwnlds = jsonNode.getOrDefault("parallelBlockDownloads").getBiggestInt(10) headerStoreLen = jsonNode.getOrDefault("headerStoreLen").getInt(256) storageCacheLen = jsonNode.getOrDefault("storageCacheLen").getInt(256) codeCacheLen = jsonNode.getOrDefault("codeCacheLen").getInt(64) @@ -73,11 +74,20 @@ proc load(T: type VerifiedProxyConf, configJson: string): T {.raises: [ProxyErro logLevel: logLevel, logFormat: logFormat, dataDirFlag: none(OutDir), - maxBlockWalk: uint64(maxBlockWalk), + maxBlockWalk: + if maxBlockWalk < 0: + uint64(0) + else: + uint64(maxBlockWalk), headerStoreLen: headerStoreLen, storageCacheLen: storageCacheLen, codeCacheLen: codeCacheLen, accountCacheLen: accountCacheLen, + parallelBlockDownloads: + if prllBlkDwnlds < 0: + uint64(0) + else: + uint64(prllBlkDwnlds), ) proc run*( @@ -95,6 +105,7 @@ proc run*( accountCacheLen: config.accountCacheLen, codeCacheLen: config.codeCacheLen, storageCacheLen: config.storageCacheLen, + parallelBlockDownloads: config.parallelBlockDownloads, ) engine = RpcVerificationEngine.init(engineConf).valueOr: raise newException(ProxyError, error.errMsg) diff --git a/nimbus_verified_proxy/nimbus_verified_proxy.nim b/nimbus_verified_proxy/nimbus_verified_proxy.nim index 32fb49bf22..37d3599311 100644 --- a/nimbus_verified_proxy/nimbus_verified_proxy.nim +++ b/nimbus_verified_proxy/nimbus_verified_proxy.nim @@ -95,6 +95,7 @@ proc run( accountCacheLen: config.accountCacheLen, codeCacheLen: config.codeCacheLen, storageCacheLen: config.storageCacheLen, + parallelBlockDownloads: config.parallelBlockDownloads, ) engine = RpcVerificationEngine.init(engineConf).valueOr: raise newException(ProxyError, "Couldn't initialize verification engine") diff --git a/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim b/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim index 95d70c4fcd..73b73da1df 100644 --- a/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim +++ b/nimbus_verified_proxy/nimbus_verified_proxy_conf.nim @@ -98,6 +98,13 @@ type VerifiedProxyConf* = object name: "debug-max-walk" .}: uint64 + parallelBlockDownloads* {. + hidden, + desc: "Number of blocks downloaded parallely. Affects memory usage", + defaultValue: 10, + name: "debug-parallel-downloads" + .}: uint64 + # Consensus light sync # No default - Needs to be provided by the user trustedBlockRoot* {. diff --git a/nimbus_verified_proxy/tests/test_blocks.nim b/nimbus_verified_proxy/tests/test_blocks.nim index 2389f03662..5bf71135b8 100644 --- a/nimbus_verified_proxy/tests/test_blocks.nim +++ b/nimbus_verified_proxy/tests/test_blocks.nim @@ -100,6 +100,7 @@ suite "test verified blocks": ts.loadBlock(blk) if i == sourceBlockNum: check engine.headerStore.add(convHeader(blk), blk.hash).isOk() + check engine.headerStore.updateFinalized(convHeader(blk), blk.hash).isOk() let unreachableTargetTag = diff --git a/nimbus_verified_proxy/tests/test_utils.nim b/nimbus_verified_proxy/tests/test_utils.nim index 16ac3f0d1e..2463613222 100644 --- a/nimbus_verified_proxy/tests/test_utils.nim +++ b/nimbus_verified_proxy/tests/test_utils.nim @@ -82,6 +82,7 @@ proc initTestEngine*( accountCacheLen: 1, codeCacheLen: 1, storageCacheLen: 1, + parallelBlockDownloads: 2, # >1 required for block walk tests ) engine = ?RpcVerificationEngine.init(engineConf)