From a8f326aa6023afa9eb968959c8007e5caf10406e Mon Sep 17 00:00:00 2001 From: Scorbajio Date: Wed, 17 Jan 2024 05:22:37 -0700 Subject: [PATCH] Readable stream analysis refactor (#3231) * Experiment with importing webstreams * Suppress implicit-import eslint error * Replace readable-stream with web stream api * Remove unnecessary null check * Update package-lock and dependencies * Fix test * Make nonbreaking * Use async stream in statemanager * Update package-lock file and dependencies * Include comment to detail deviation from import policy * Revert "Update package-lock file and dependencies" This reverts commit baf67f4ada44f7bfd23d1896d7cbb88d033ad657. * Revert "Update package-lock and dependencies" This reverts commit 02b8845a0e35b8a97efa7b69a6fc26f03e210c38. * Update package-lock --- packages/statemanager/src/stateManager.ts | 82 +++++++++-------------- packages/trie/src/trie.ts | 20 +++++- packages/trie/src/util/readStream.ts | 69 ++++++++++++------- packages/trie/test/stream.spec.ts | 25 +++++-- 4 files changed, 117 insertions(+), 79 deletions(-) diff --git a/packages/statemanager/src/stateManager.ts b/packages/statemanager/src/stateManager.ts index 88a46a7215..4bfc645c8a 100644 --- a/packages/statemanager/src/stateManager.ts +++ b/packages/statemanager/src/stateManager.ts @@ -962,21 +962,14 @@ export class DefaultStateManager implements EVMStateManagerInterface { throw new Error(`dumpStorage f() can only be called for an existing account`) } const trie = this._getStorageTrie(address, account) + const storage: StorageDump = {} + const stream = trie.createAsyncReadStream() - return new Promise((resolve, reject) => { - const storage: StorageDump = {} - const stream = trie.createReadStream() + for await (const chunk of stream) { + storage[bytesToHex(chunk.key)] = bytesToHex(chunk.value) + } - stream.on('data', (val: any) => { - storage[bytesToHex(val.key)] = bytesToHex(val.value) - }) - stream.on('end', () => { - resolve(storage) - }) - stream.on('error', (e) => { - reject(e) - }) - }) + return storage } /** @@ -999,44 +992,35 @@ export class DefaultStateManager implements EVMStateManagerInterface { throw new Error(`Account does not exist.`) } const trie = this._getStorageTrie(address, account) - - return new Promise((resolve, reject) => { - let inRange = false - let i = 0 - - /** Object conforming to {@link StorageRange.storage}. */ - const storageMap: StorageRange['storage'] = {} - const stream = trie.createReadStream() - - stream.on('data', (val: any) => { - if (!inRange) { - // Check if the key is already in the correct range. - if (bytesToBigInt(val.key) >= startKey) { - inRange = true - } else { - return - } - } - - if (i < limit) { - storageMap[bytesToHex(val.key)] = { key: null, value: bytesToHex(val.value) } - i++ - } else if (i === limit) { - resolve({ - storage: storageMap, - nextKey: bytesToHex(val.key), - }) + let inRange = false + let i = 0 + + /** Object conforming to {@link StorageRange.storage}. */ + const storageMap: StorageRange['storage'] = {} + const stream = trie.createAsyncReadStream() + for await (const chunk of stream) { + if (!inRange) { + // Check if the key is already in the correct range. + if (bytesToBigInt(chunk.key) >= startKey) { + inRange = true + } else { + continue } - }) - - stream.on('end', () => { - resolve({ + } + if (i < limit) { + storageMap[bytesToHex(chunk.key)] = { key: null, value: bytesToHex(chunk.value) } + i++ + } else if (i === limit) { + return { storage: storageMap, - nextKey: null, - }) - }) - stream.on('error', (e) => reject(e)) - }) + nextKey: bytesToHex(chunk.key), + } + } + } + return { + storage: storageMap, + nextKey: null, + } } /** diff --git a/packages/trie/src/trie.ts b/packages/trie/src/trie.ts index 335d794db1..084db74195 100644 --- a/packages/trie/src/trie.ts +++ b/packages/trie/src/trie.ts @@ -27,7 +27,10 @@ import { verifyRangeProof } from './proof/range.js' import { ROOT_DB_KEY } from './types.js' import { _walkTrie } from './util/asyncWalk.js' import { bytesToNibbles, matchingNibbleLength } from './util/nibbles.js' -import { TrieReadStream as ReadStream } from './util/readStream.js' +import { + TrieReadStream as ReadStream, + asyncTrieReadStream as asyncReadStream, +} from './util/readStream.js' import { WalkController } from './util/walkController.js' import type { @@ -43,6 +46,12 @@ import type { import type { OnFound } from './util/asyncWalk.js' import type { BatchDBOp, DB, PutBatch } from '@ethereumjs/util' import type { Debugger } from 'debug' +// Since ReadableStream is from a Web API, the following type import +// is not needed in and should be ignored by the browser, so an exeption +// is made here to deviate from our policy to not add Node.js specific +// package imports. -- 16/01/24 +// eslint-disable-next-line implicit-dependencies/no-implicit +import type { ReadableStream } from 'node:stream/web' interface Path { node: TrieNode | null @@ -1078,12 +1087,21 @@ export class Trie { /** * The `data` event is given an `Object` that has two properties; the `key` and the `value`. Both should be Uint8Arrays. + * @deprecated Use `createAsyncReadStream` * @return Returns a [stream](https://nodejs.org/dist/latest-v12.x/docs/api/stream.html#stream_class_stream_readable) of the contents of the `trie` */ createReadStream(): ReadStream { return new ReadStream(this) } + /** + * Use asynchronous iteration over the chunks in a web stream using the for await...of syntax. + * @return Returns a [web stream](https://nodejs.org/api/webstreams.html#example-readablestream) of the contents of the `trie` + */ + createAsyncReadStream(): ReadableStream { + return asyncReadStream(this) + } + /** * Returns a copy of the underlying trie. * diff --git a/packages/trie/src/util/readStream.ts b/packages/trie/src/util/readStream.ts index 5ad66e21a6..402d4d74b2 100644 --- a/packages/trie/src/util/readStream.ts +++ b/packages/trie/src/util/readStream.ts @@ -1,3 +1,5 @@ +// eslint-disable-next-line implicit-dependencies/no-implicit +import { ReadableStream } from 'node:stream/web' import { Readable } from 'readable-stream' import { BranchNode, LeafNode } from '../node/index.js' @@ -7,6 +9,26 @@ import { nibblestoBytes } from './nibbles.js' import type { Trie } from '../trie.js' import type { FoundNodeFunction } from '../types.js' +const _findValueNodes = async (trie: Trie, onFound: FoundNodeFunction): Promise => { + const outerOnFound: FoundNodeFunction = async (nodeRef, node, key, walkController) => { + let fullKey = key + if (node instanceof LeafNode) { + fullKey = key.concat(node.key()) + // found leaf node! + onFound(nodeRef, node, fullKey, walkController) + } else if (node instanceof BranchNode && node.value()) { + // found branch with value + onFound(nodeRef, node, fullKey, walkController) + } else { + // keep looking for value nodes + if (node !== null) { + walkController.allChildren(node, key) + } + } + } + await trie.walkTrie(trie.root(), outerOnFound) +} + export class TrieReadStream extends Readable { private trie: Trie private _started: boolean @@ -24,7 +46,7 @@ export class TrieReadStream extends Readable { } this._started = true try { - await this._findValueNodes(async (_, node, key, walkController) => { + await _findValueNodes(this.trie, async (_, node, key, walkController) => { if (node !== null) { this.push({ key: nibblestoBytes(key), @@ -42,30 +64,29 @@ export class TrieReadStream extends Readable { } this.push(null) } +} - /** - * Finds all nodes that store k,v values - * called by {@link TrieReadStream} - * @private - */ - async _findValueNodes(onFound: FoundNodeFunction): Promise { - const outerOnFound: FoundNodeFunction = async (nodeRef, node, key, walkController) => { - let fullKey = key - - if (node instanceof LeafNode) { - fullKey = key.concat(node.key()) - // found leaf node! - onFound(nodeRef, node, fullKey, walkController) - } else if (node instanceof BranchNode && node.value()) { - // found branch with value - onFound(nodeRef, node, fullKey, walkController) - } else { - // keep looking for value nodes - if (node !== null) { - walkController.allChildren(node, key) +export function asyncTrieReadStream(trie: Trie) { + return new ReadableStream({ + async start(controller) { + try { + await _findValueNodes(trie, async (_, node, key, walkController) => { + if (node !== null) { + controller.enqueue({ + key: nibblestoBytes(key), + value: node.value(), + }) + walkController.allChildren(node, key) + } + }) + } catch (error: any) { + if (error.message === 'Missing node in DB') { + // pass + } else { + throw error } } - } - await this.trie.walkTrie(this.trie.root(), outerOnFound) - } + controller.close() + }, + }) } diff --git a/packages/trie/test/stream.spec.ts b/packages/trie/test/stream.spec.ts index 99f402d70a..06f81b56f0 100644 --- a/packages/trie/test/stream.spec.ts +++ b/packages/trie/test/stream.spec.ts @@ -94,10 +94,12 @@ describe('kv stream test', () => { }, ] as BatchDBOp[] - const valObj = {} as any + const valObj1 = {} as any + const valObj2 = {} as any for (const op of ops) { if (op.type === 'put') { - valObj[op.key.toString()] = op.value.toString() + valObj1[op.key.toString()] = op.value.toString() + valObj2[op.key.toString()] = op.value.toString() } } @@ -110,14 +112,27 @@ describe('kv stream test', () => { stream.on('data', (d: any) => { const key = d.key.toString() const value = d.value.toString() - assert.equal(valObj[key], value) - delete valObj[key] + assert.equal(valObj1[key], value) + delete valObj1[key] }) stream.on('end', () => { - const keys = Object.keys(valObj) + const keys = Object.keys(valObj1) assert.equal(keys.length, 0) }) }) + + it('should fetch all of the nodes from async stream', async () => { + const stream = trie.createAsyncReadStream() + for await (const chunk of stream) { + const key = chunk.key.toString() + const value = chunk.value.toString() + assert.equal(valObj2[key], value) + delete valObj2[key] + } + + const keys = Object.keys(valObj2) + assert.equal(keys.length, 0) + }) }) describe('db stream test', () => {