Skip to content

Commit

Permalink
trie: remove inconsistent trie nodes during sync in path mode (ethere…
Browse files Browse the repository at this point in the history
…um#28595)

This fixes a database corruption issue that could occur during state healing.
When sync is aborted while certain modifications were already committed, and a
reorg occurs, the database would contain incorrect trie nodes stored by path.
These nodes need to detected/deleted in order to obtain a complete and fully correct state
after state healing.

---------

Co-authored-by: Felix Lange <[email protected]>
  • Loading branch information
rjl493456442 and fjl committed Dec 8, 2023
1 parent d98d70f commit e206d3f
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 84 deletions.
6 changes: 5 additions & 1 deletion ethdb/dbtest/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,13 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) {
b.Put([]byte("5"), nil)
b.Delete([]byte("1"))
b.Put([]byte("6"), nil)
b.Delete([]byte("3"))

b.Delete([]byte("3")) // delete then put
b.Put([]byte("3"), nil)

b.Put([]byte("7"), nil) // put then delete
b.Delete([]byte("7"))

if err := b.Write(); err != nil {
t.Fatal(err)
}
Expand Down
223 changes: 143 additions & 80 deletions trie/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ type LeafCallback func(keys [][]byte, path []byte, leaf []byte, parent common.Ha

// nodeRequest represents a scheduled or already in-flight trie node retrieval request.
type nodeRequest struct {
hash common.Hash // Hash of the trie node to retrieve
path []byte // Merkle path leading to this node for prioritization
data []byte // Data content of the node, cached until all subtrees complete
deletes [][]byte // List of internal path segments for trie nodes to delete
hash common.Hash // Hash of the trie node to retrieve
path []byte // Merkle path leading to this node for prioritization
data []byte // Data content of the node, cached until all subtrees complete

parent *nodeRequest // Parent state node referencing this entry
deps int // Number of dependencies before allowed to commit this node
Expand All @@ -146,38 +145,85 @@ type CodeSyncResult struct {
Data []byte // Data content of the retrieved bytecode
}

// nodeOp represents an operation upon the trie node. It can either represent a
// deletion to the specific node or a node write for persisting retrieved node.
type nodeOp struct {
owner common.Hash // identifier of the trie (empty for account trie)
path []byte // path from the root to the specified node.
blob []byte // the content of the node (nil for deletion)
hash common.Hash // hash of the node content (empty for node deletion)
}

// isDelete indicates if the operation is a database deletion.
func (op *nodeOp) isDelete() bool {
return len(op.blob) == 0
}

// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
// persisted data items.
type syncMemBatch struct {
nodes map[string][]byte // In-memory membatch of recently completed nodes
hashes map[string]common.Hash // Hashes of recently completed nodes
deletes map[string]struct{} // List of paths for trie node to delete
codes map[common.Hash][]byte // In-memory membatch of recently completed codes
size uint64 // Estimated batch-size of in-memory data.
scheme string // State scheme identifier
codes map[common.Hash][]byte // In-memory batch of recently completed codes
nodes []nodeOp // In-memory batch of recently completed/deleted nodes
size uint64 // Estimated batch-size of in-memory data.
}

// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch() *syncMemBatch {
func newSyncMemBatch(scheme string) *syncMemBatch {
return &syncMemBatch{
nodes: make(map[string][]byte),
hashes: make(map[string]common.Hash),
deletes: make(map[string]struct{}),
codes: make(map[common.Hash][]byte),
scheme: scheme,
codes: make(map[common.Hash][]byte),
}
}

// hasNode reports the trie node with specific path is already cached.
func (batch *syncMemBatch) hasNode(path []byte) bool {
_, ok := batch.nodes[string(path)]
return ok
}

// hasCode reports the contract code with specific hash is already cached.
func (batch *syncMemBatch) hasCode(hash common.Hash) bool {
_, ok := batch.codes[hash]
return ok
}

// addCode caches a contract code database write operation.
func (batch *syncMemBatch) addCode(hash common.Hash, code []byte) {
batch.codes[hash] = code
batch.size += common.HashLength + uint64(len(code))
}

// addNode caches a node database write operation.
func (batch *syncMemBatch) addNode(owner common.Hash, path []byte, blob []byte, hash common.Hash) {
if batch.scheme == rawdb.PathScheme {
if owner == (common.Hash{}) {
batch.size += uint64(len(path) + len(blob))
} else {
batch.size += common.HashLength + uint64(len(path)+len(blob))
}
} else {
batch.size += common.HashLength + uint64(len(blob))
}
batch.nodes = append(batch.nodes, nodeOp{
owner: owner,
path: path,
blob: blob,
hash: hash,
})
}

// delNode caches a node database delete operation.
func (batch *syncMemBatch) delNode(owner common.Hash, path []byte) {
if batch.scheme != rawdb.PathScheme {
log.Error("Unexpected node deletion", "owner", owner, "path", path, "scheme", batch.scheme)
return // deletion is not supported in hash mode.
}
if owner == (common.Hash{}) {
batch.size += uint64(len(path))
} else {
batch.size += common.HashLength + uint64(len(path))
}
batch.nodes = append(batch.nodes, nodeOp{
owner: owner,
path: path,
})
}

// Sync is the main state trie synchronisation scheduler, which provides yet
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
Expand All @@ -196,7 +242,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
ts := &Sync{
scheme: scheme,
database: database,
membatch: newSyncMemBatch(),
membatch: newSyncMemBatch(scheme),
nodeReqs: make(map[string]*nodeRequest),
codeReqs: make(map[common.Hash]*codeRequest),
queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy
Expand All @@ -210,16 +256,17 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
// parent for completion tracking. The given path is a unique node path in
// hex format and contain all the parent path if it's layered trie node.
func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, parentPath []byte, callback LeafCallback) {
// Short circuit if the trie is empty or already known
if root == types.EmptyRootHash {
return
}
if s.membatch.hasNode(path) {
return
}
owner, inner := ResolvePath(path)
if rawdb.HasTrieNode(s.database, owner, inner, root, s.scheme) {
exist, inconsistent := s.hasNode(owner, inner, root)
if exist {
// The entire subtrie is already present in the database.
return
} else if inconsistent {
// There is a pre-existing node with the wrong hash in DB, remove it.
s.membatch.delNode(owner, inner)
}
// Assemble the new sub-trie sync request
req := &nodeRequest{
Expand Down Expand Up @@ -371,39 +418,42 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error {
}

// Commit flushes the data stored in the internal membatch out to persistent
// storage, returning any occurred error.
// storage, returning any occurred error. The whole data set will be flushed
// in an atomic database batch.
func (s *Sync) Commit(dbw ethdb.Batch) error {
// Flush the pending node writes into database batch.
var (
account int
storage int
)
for path, value := range s.membatch.nodes {
owner, inner := ResolvePath([]byte(path))
if owner == (common.Hash{}) {
account += 1
for _, op := range s.membatch.nodes {
if op.isDelete() {
// node deletion is only supported in path mode.
if op.owner == (common.Hash{}) {
rawdb.DeleteAccountTrieNode(dbw, op.path)
} else {
rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path)
}
deletionGauge.Inc(1)
} else {
storage += 1
if op.owner == (common.Hash{}) {
account += 1
} else {
storage += 1
}
rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme)
}
rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme)
}
accountNodeSyncedGauge.Inc(int64(account))
storageNodeSyncedGauge.Inc(int64(storage))

// Flush the pending node deletes into the database batch.
// Please note that each written and deleted node has a
// unique path, ensuring no duplication occurs.
for path := range s.membatch.deletes {
owner, inner := ResolvePath([]byte(path))
rawdb.DeleteTrieNode(dbw, owner, inner, common.Hash{} /* unused */, s.scheme)
}
// Flush the pending code writes into database batch.
for hash, value := range s.membatch.codes {
rawdb.WriteCode(dbw, hash, value)
}
codeSyncedGauge.Inc(int64(len(s.membatch.codes)))

s.membatch = newSyncMemBatch() // reset the batch
s.membatch = newSyncMemBatch(s.scheme) // reset the batch
return nil
}

Expand Down Expand Up @@ -476,12 +526,15 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
// child as invalid. This is essential in the case of path mode
// scheme; otherwise, state healing might overwrite existing child
// nodes silently while leaving a dangling parent node within the
// range of this internal path on disk. This would break the
// guarantee for state healing.
// range of this internal path on disk and the persistent state
// ends up with a very weird situation that nodes on the same path
// are not inconsistent while they all present in disk. This property
// would break the guarantee for state healing.
//
// While it's possible for this shortNode to overwrite a previously
// existing full node, the other branches of the fullNode can be
// retained as they remain untouched and complete.
// retained as they are not accessible with the new shortNode, and
// also the whole sub-trie is still untouched and complete.
//
// This step is only necessary for path mode, as there is no deletion
// in hash mode at all.
Expand All @@ -498,8 +551,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
exists = rawdb.ExistsStorageTrieNode(s.database, owner, append(inner, key[:i]...))
}
if exists {
req.deletes = append(req.deletes, key[:i])
deletionGauge.Inc(1)
s.membatch.delNode(owner, append(inner, key[:i]...))
log.Debug("Detected dangling node", "owner", owner, "path", append(inner, key[:i]...))
}
}
Expand All @@ -521,6 +573,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
var (
missing = make(chan *nodeRequest, len(children))
pending sync.WaitGroup
batchMu sync.Mutex
)
for _, child := range children {
// Notify any external watcher of a new key/value node
Expand All @@ -538,34 +591,32 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
}
}
}
// If the child references another node, resolve or schedule
// If the child references another node, resolve or schedule.
// We check all children concurrently.
if node, ok := (child.node).(hashNode); ok {
// Try to resolve the node from the local database
if s.membatch.hasNode(child.path) {
continue
}
// Check the presence of children concurrently
path := child.path
hash := common.BytesToHash(node)
pending.Add(1)
go func(child childNode) {
go func() {
defer pending.Done()

// If database says duplicate, then at least the trie node is present
// and we hold the assumption that it's NOT legacy contract code.
var (
chash = common.BytesToHash(node)
owner, inner = ResolvePath(child.path)
)
if rawdb.HasTrieNode(s.database, owner, inner, chash, s.scheme) {
owner, inner := ResolvePath(path)
exist, inconsistent := s.hasNode(owner, inner, hash)
if exist {
return
} else if inconsistent {
// There is a pre-existing node with the wrong hash in DB, remove it.
batchMu.Lock()
s.membatch.delNode(owner, inner)
batchMu.Unlock()
}
// Locally unknown node, schedule for retrieval
missing <- &nodeRequest{
path: child.path,
hash: chash,
path: path,
hash: hash,
parent: req,
callback: req.callback,
}
}(child)
}()
}
}
pending.Wait()
Expand All @@ -587,21 +638,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
// committed themselves.
func (s *Sync) commitNodeRequest(req *nodeRequest) error {
// Write the node content to the membatch
s.membatch.nodes[string(req.path)] = req.data
s.membatch.hashes[string(req.path)] = req.hash
owner, path := ResolvePath(req.path)
s.membatch.addNode(owner, path, req.data, req.hash)

// The size tracking refers to the db-batch, not the in-memory data.
if s.scheme == rawdb.PathScheme {
s.membatch.size += uint64(len(req.path) + len(req.data))
} else {
s.membatch.size += common.HashLength + uint64(len(req.data))
}
// Delete the internal nodes which are marked as invalid
for _, segment := range req.deletes {
path := append(req.path, segment...)
s.membatch.deletes[string(path)] = struct{}{}
s.membatch.size += uint64(len(path))
}
// Removed the completed node request
delete(s.nodeReqs, string(req.path))
s.fetches[len(req.path)]--

Expand All @@ -622,8 +662,9 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error {
// committed themselves.
func (s *Sync) commitCodeRequest(req *codeRequest) error {
// Write the node content to the membatch
s.membatch.codes[req.hash] = req.data
s.membatch.size += common.HashLength + uint64(len(req.data))
s.membatch.addCode(req.hash, req.data)

// Removed the completed code request
delete(s.codeReqs, req.hash)
s.fetches[len(req.path)]--

Expand All @@ -639,6 +680,28 @@ func (s *Sync) commitCodeRequest(req *codeRequest) error {
return nil
}

// hasNode reports whether the specified trie node is present in the database.
// 'exists' is true when the node exists in the database and matches the given root
// hash. The 'inconsistent' return value is true when the node exists but does not
// match the expected hash.
func (s *Sync) hasNode(owner common.Hash, path []byte, hash common.Hash) (exists bool, inconsistent bool) {
// If node is running with hash scheme, check the presence with node hash.
if s.scheme == rawdb.HashScheme {
return rawdb.HasLegacyTrieNode(s.database, hash), false
}
// If node is running with path scheme, check the presence with node path.
var blob []byte
var dbHash common.Hash
if owner == (common.Hash{}) {
blob, dbHash = rawdb.ReadAccountTrieNode(s.database, path)
} else {
blob, dbHash = rawdb.ReadStorageTrieNode(s.database, owner, path)
}
exists = hash == dbHash
inconsistent = !exists && len(blob) != 0
return exists, inconsistent
}

// ResolvePath resolves the provided composite node path by separating the
// path in account trie if it's existent.
func ResolvePath(path []byte) (common.Hash, []byte) {
Expand Down
Loading

0 comments on commit e206d3f

Please sign in to comment.