diff --git a/chain/forkchoice/attestation.go b/chain/forkchoice/attestation.go index f438302..fe6cfb2 100644 --- a/chain/forkchoice/attestation.go +++ b/chain/forkchoice/attestation.go @@ -138,7 +138,9 @@ func (c *Store) processAttestationLocked(sa *types.SignedAttestation, isFromBloc if !ok || existing == nil || existing.Message == nil || existing.Message.Slot < data.Slot { c.latestNewAttestations[validatorID] = sa } - c.storeGossipSignatureLocked(sa) + if c.isAggregator { + c.storeGossipSignatureLocked(sa) + } } metrics.AttestationsValid.WithLabelValues(sourceLabel).Inc() diff --git a/chain/forkchoice/block.go b/chain/forkchoice/block.go index 85b80c6..1f0c7c4 100644 --- a/chain/forkchoice/block.go +++ b/chain/forkchoice/block.go @@ -172,6 +172,7 @@ func (c *Store) ProcessBlock(envelope *types.SignedBlockWithAttestation) error { c.latestFinalized = state.LatestFinalized metrics.FinalizationsTotal.WithLabelValues("success").Inc() metrics.LatestFinalizedSlot.Set(float64(state.LatestFinalized.Slot)) + c.pruneOnFinalization() } // Step 2: Process body attestations as on-chain votes. diff --git a/chain/forkchoice/prune.go b/chain/forkchoice/prune.go new file mode 100644 index 0000000..0c28726 --- /dev/null +++ b/chain/forkchoice/prune.go @@ -0,0 +1,226 @@ +package forkchoice + +import ( + "github.com/geanlabs/gean/types" +) + +// Storage retention limits aligned with ethlambda (store.rs:83-92). +const ( + // blocksToKeep is ~1 day of block history at 4-second slots (86400/4). + blocksToKeep = 21_600 + + // statesToKeep is ~3.3 hours of state history at 4-second slots (12000/4). + statesToKeep = 3_000 + + // maxKnownAggregatedPayloads caps the known aggregated payloads map to + // prevent unbounded growth during stalled finalization. Matches + // ethlambda's AGGREGATED_PAYLOAD_CAP. + maxKnownAggregatedPayloads = 4096 + + // maxAggregatedPayloadKeys caps the aggregatedPayloads proof cache to + // prevent unbounded key growth. + maxAggregatedPayloadKeys = 8192 + + // periodicPruningInterval is the number of slots between periodic + // pruning passes. Acts as a safety net when finalization stalls. + // Matches zeam FORKCHOICE_PRUNING_INTERVAL_SLOTS (constants.zig:22). + periodicPruningInterval = 7200 + + // periodicPruningLagThreshold defines how far finalization must lag + // behind the current slot before periodic pruning kicks in. Prevents + // unnecessary pruning work when finalization is healthy. + // Matches zeam's guard: finalized.slot + 2*7200 < current_slot. + periodicPruningLagThreshold = 2 * periodicPruningInterval +) + +// pruneOnFinalization removes data that can no longer influence fork choice +// after finalization advances. Matches leanSpec prune_stale_attestation_data() +// (store.py:228-268). +// +// Must be called with c.mu held. +func (c *Store) pruneOnFinalization() { + finalizedSlot := c.latestFinalized.Slot + + c.pruneStaleAttestationData(finalizedSlot) + c.pruneAggregatedPayloadsCache(finalizedSlot) + c.pruneStorage(finalizedSlot) +} + +// pruneStaleAttestationData removes aggregated payload entries where the +// attestation target slot is at or before the finalized slot. Matches +// leanSpec store.py:245-268 which filters by target.slot > finalized_slot. +func (c *Store) pruneStaleAttestationData(finalizedSlot uint64) { + for key, payload := range c.latestKnownAggregatedPayloads { + if payload.data != nil && payload.data.Target != nil && payload.data.Target.Slot <= finalizedSlot { + delete(c.latestKnownAggregatedPayloads, key) + } + } + for key, payload := range c.latestNewAggregatedPayloads { + if payload.data != nil && payload.data.Target != nil && payload.data.Target.Slot <= finalizedSlot { + delete(c.latestNewAggregatedPayloads, key) + } + } +} + +// pruneAggregatedPayloadsCache removes signature cache entries for +// attestation data at or before the finalized slot. +func (c *Store) pruneAggregatedPayloadsCache(finalizedSlot uint64) { + for key, entries := range c.aggregatedPayloads { + if len(entries) > 0 && entries[0].slot <= finalizedSlot { + delete(c.aggregatedPayloads, key) + } + } + for key, stored := range c.gossipSignatures { + if stored.slot <= finalizedSlot { + delete(c.gossipSignatures, key) + } + } +} + +// pruneStorage removes blocks and states that are below the finalized slot +// and not on the canonical chain. Uses retention limits for blocks and states. +// +// Uses ForEachBlock to iterate without copying the full block map. +func (c *Store) pruneStorage(finalizedSlot uint64) { + if finalizedSlot == 0 { + return + } + + // Collect canonical chain roots by walking from head to finalized root. + canonical := make(map[[32]byte]struct{}) + current := c.head + for { + canonical[current] = struct{}{} + block, ok := c.storage.GetBlock(current) + if !ok { + break + } + if block.Slot <= finalizedSlot { + break + } + current = block.ParentRoot + } + // Always keep finalized and justified roots. + canonical[c.latestFinalized.Root] = struct{}{} + canonical[c.latestJustified.Root] = struct{}{} + + // State retention: prune canonical states older than this threshold. + // Matches ethlambda STATES_TO_KEEP (~3.3 hours). + var pruneStatesBelow uint64 + if finalizedSlot > statesToKeep { + pruneStatesBelow = finalizedSlot - statesToKeep + } + + // Single pass: collect roots to delete. We cannot delete during + // ForEachBlock iteration (bolt doesn't allow mutation during View tx). + var deleteRoots [][32]byte + var deleteStateOnlyRoots [][32]byte + + c.storage.ForEachBlock(func(root [32]byte, block *types.Block) bool { + if block.Slot >= finalizedSlot { + return true // keep: at or above finalized + } + + if _, ok := canonical[root]; !ok { + // Non-canonical block below finalized: delete everything. + deleteRoots = append(deleteRoots, root) + return true + } + + // Canonical block below finalized: keep block, but prune old states. + if pruneStatesBelow > 0 && block.Slot < pruneStatesBelow { + if root != c.latestFinalized.Root && root != c.latestJustified.Root { + deleteStateOnlyRoots = append(deleteStateOnlyRoots, root) + } + } + return true + }) + + for _, root := range deleteRoots { + c.storage.DeleteBlock(root) + c.storage.DeleteSignedBlock(root) + c.storage.DeleteState(root) + } + for _, root := range deleteStateOnlyRoots { + c.storage.DeleteState(root) + } + + if len(deleteRoots) > 0 || len(deleteStateOnlyRoots) > 0 { + log.Info("pruned storage on finalization", + "finalized_slot", finalizedSlot, + "blocks_deleted", len(deleteRoots), + "states_deleted", len(deleteRoots)+len(deleteStateOnlyRoots), + ) + } +} + +// enforcePayloadCap evicts the oldest entries from latestKnownAggregatedPayloads +// when the map exceeds maxKnownAggregatedPayloads. This bounds memory even when +// finalization stalls. Matches ethlambda's FIFO PayloadBuffer pattern. +func (c *Store) enforcePayloadCap() { + for len(c.latestKnownAggregatedPayloads) > maxKnownAggregatedPayloads { + var oldestKey [32]byte + oldestSlot := uint64(^uint64(0)) + found := false + for key, payload := range c.latestKnownAggregatedPayloads { + if payload.data != nil && payload.data.Target != nil && payload.data.Target.Slot < oldestSlot { + oldestSlot = payload.data.Target.Slot + oldestKey = key + found = true + } + } + if !found { + break + } + delete(c.latestKnownAggregatedPayloads, oldestKey) + } +} + +// maybePeriodicPruneLocked runs a pruning pass every periodicPruningInterval +// slots when finalization is lagging. This is a safety net that prevents +// unbounded memory growth even if finalization stalls for an extended period. +// Matches zeam's periodic pruning pattern (chain.zig:302-326). +// +// Must be called with c.mu held. +func (c *Store) maybePeriodicPruneLocked() { + currentSlot := c.time / types.IntervalsPerSlot + if currentSlot == 0 || currentSlot%periodicPruningInterval != 0 { + return + } + + finalizedSlot := c.latestFinalized.Slot + if finalizedSlot+periodicPruningLagThreshold >= currentSlot { + return // finalization is healthy, no need for periodic pruning + } + + log.Warn("finalization lagging, running periodic pruning", + "current_slot", currentSlot, + "finalized_slot", finalizedSlot, + "lag", currentSlot-finalizedSlot, + ) + + c.pruneStaleAttestationData(finalizedSlot) + c.pruneAggregatedPayloadsCache(finalizedSlot) + c.pruneStorage(finalizedSlot) +} + +// enforceAggregatedPayloadsCacheCap bounds the aggregatedPayloads proof cache +// keys to prevent unbounded growth independent of finalization. +func (c *Store) enforceAggregatedPayloadsCacheCap() { + for len(c.aggregatedPayloads) > maxAggregatedPayloadKeys { + var oldestKey signatureKey + oldestSlot := uint64(^uint64(0)) + found := false + for key, entries := range c.aggregatedPayloads { + if len(entries) > 0 && entries[0].slot < oldestSlot { + oldestSlot = entries[0].slot + oldestKey = key + found = true + } + } + if !found { + break + } + delete(c.aggregatedPayloads, oldestKey) + } +} diff --git a/chain/forkchoice/store.go b/chain/forkchoice/store.go index c5b4434..94a8843 100644 --- a/chain/forkchoice/store.go +++ b/chain/forkchoice/store.go @@ -282,11 +282,12 @@ func (c *Store) lookupBlockSummary(root [32]byte) (blockSummary, bool) { } func (c *Store) allKnownBlockSummaries() map[[32]byte]blockSummary { - blocks := c.storage.GetAllBlocks() - summaries := make(map[[32]byte]blockSummary, len(blocks)+len(c.checkpointRoots)) - for root, block := range blocks { + summaries := make(map[[32]byte]blockSummary, len(c.checkpointRoots)) + // Iterate storage without copying the full block map. + c.storage.ForEachBlock(func(root [32]byte, block *types.Block) bool { summaries[root] = summarizeBlock(block) - } + return true + }) for root, summary := range c.checkpointRoots { if _, ok := summaries[root]; !ok { summaries[root] = summary diff --git a/chain/forkchoice/time.go b/chain/forkchoice/time.go index 7f64854..3b605a3 100644 --- a/chain/forkchoice/time.go +++ b/chain/forkchoice/time.go @@ -54,6 +54,11 @@ func (c *Store) tickIntervalLocked(hasProposal bool) { if hasProposal { c.acceptNewAttestationsLocked() } + // Periodic pruning safety net: prune stale data when finalization + // is lagging, even if pruneOnFinalization hasn't been triggered. + // Runs every periodicPruningInterval slots. Matches zeam's + // FORKCHOICE_PRUNING_INTERVAL_SLOTS pattern (constants.zig:22). + c.maybePeriodicPruneLocked() case 1: // Validator voting interval — no action. case 2: @@ -90,6 +95,11 @@ func (c *Store) acceptNewAttestationsLocked() { c.latestKnownAttestations[id] = sa } c.latestNewAttestations = make(map[uint64]*types.SignedAttestation) + + // Enforce caps to bound memory even when finalization stalls. + c.enforcePayloadCap() + c.enforceAggregatedPayloadsCacheCap() + metrics.LatestKnownAggregatedPayloads.Set(float64(len(c.latestKnownAggregatedPayloads))) c.updateHeadLocked() } diff --git a/node/node.go b/node/node.go index 1710512..3140f39 100644 --- a/node/node.go +++ b/node/node.go @@ -44,6 +44,12 @@ type Node struct { peerBackoff map[peer.ID]*peerSyncState peerBackoffMu sync.Mutex + // Per-peer concurrency tracking. Limits in-flight sync requests to + // maxConcurrentRequestsPerPeer (2) per peer, matching leanSpec + // MAX_CONCURRENT_REQUESTS. + peerInFlight map[peer.ID]int + peerInFlightMu sync.Mutex + // Recovery cooldown prevents recoverMissingParentSync from flooding // peers when multiple gossip blocks arrive with missing parents. recoveryMu sync.Mutex @@ -87,6 +93,10 @@ const ( // backoffMultiplier doubles the backoff on each consecutive failure. backoffMultiplier = 2 + + // maxConcurrentRequestsPerPeer limits in-flight sync requests to a + // single peer. Matches leanSpec MAX_CONCURRENT_REQUESTS. + maxConcurrentRequestsPerPeer = 2 ) func (n *Node) Close() { diff --git a/node/sync.go b/node/sync.go index ea2dee1..aa29765 100644 --- a/node/sync.go +++ b/node/sync.go @@ -18,9 +18,9 @@ func isMissingParentStateErr(err error) bool { } // syncWithPeer exchanges status and fetches missing blocks from a single peer. -// It walks backwards from the peer's head collecting roots we need, then -// requests them in batches (up to maxBlocksPerRequest per RPC call) and -// processes them in forward order. +// It walks backwards from the peer's head, keeping fetched blocks in memory, +// then processes them in forward order. Each root is marked pending immediately +// to prevent duplicate fetches by concurrent sync paths. // // The backward walk is capped at maxBackfillDepth (512) to prevent resource // exhaustion from deep chains, matching leanSpec MAX_BACKFILL_DEPTH. @@ -28,6 +28,11 @@ func (n *Node) syncWithPeer(ctx context.Context, pid peer.ID) bool { if !n.canSyncWithPeer(pid) { return false } + if !n.acquirePeerSlot(pid) { + n.log.Debug("peer at max concurrent requests, skipping", "peer_id", pid.String()) + return false + } + defer n.releasePeerSlot(pid) status := n.FC.GetStatus() ourStatus := reqresp.Status{ @@ -61,9 +66,11 @@ func (n *Node) syncWithPeer(ctx context.Context, pid peer.ID) bool { return false } - // Phase 1: Walk backwards collecting roots we need to fetch. - // Stop when we find a root we have state for, or hit the depth limit. - var rootsToFetch [][32]byte + // Walk backwards from peer's head, fetching blocks and keeping them. + // Each root is marked pending immediately (before fetch) to prevent + // concurrent sync paths from requesting the same root. + var pending []*types.SignedBlockWithAttestation + var pendingMarked [][32]byte nextRoot := peerStatus.Head.Root for i := 0; i < maxBackfillDepth; i++ { @@ -77,8 +84,11 @@ func (n *Node) syncWithPeer(ctx context.Context, pid peer.ID) bool { break } - // Request this single root to discover its parent for the walk. - // We need the block to learn its ParentRoot for the next step. + // Mark this root as pending BEFORE requesting it, matching + // leanSpec BackfillSync._pending pattern (backfill_sync.py:164). + n.markRootPending(nextRoot) + pendingMarked = append(pendingMarked, nextRoot) + blocks, err := reqresp.RequestBlocksByRoot(ctx, n.Host.P2P, pid, [][32]byte{nextRoot}) if err != nil || len(blocks) == 0 { n.log.Debug("blocks_by_root failed during sync walk", @@ -91,62 +101,34 @@ func (n *Node) syncWithPeer(ctx context.Context, pid peer.ID) bool { } sb := blocks[0] - rootsToFetch = append(rootsToFetch, nextRoot) + pending = append(pending, sb) nextRoot = sb.Message.Block.ParentRoot } - if len(rootsToFetch) == 0 { + // Always clear pending roots when done, even on failure. + defer n.clearPendingRoots(pendingMarked) + + if len(pending) == 0 { return false } - // Check if we reached a known ancestor. + // Check if we reached a known ancestor with state. if !n.FC.HasState(nextRoot) { n.log.Debug("sync walk did not reach known ancestor with state", "peer_id", pid.String(), "ancestor_root", logging.LongHash(nextRoot), - "collected", len(rootsToFetch), + "fetched", len(pending), "max_depth", maxBackfillDepth, ) return false } - // Mark all roots as pending to prevent duplicate fetches. - n.markRootsPending(rootsToFetch) - defer n.clearPendingRoots(rootsToFetch) - - // Phase 2: Fetch blocks in batches of maxBlocksPerRequest (10). - // Roots are in newest-first order; we reverse each batch for forward processing. - var allBlocks []*types.SignedBlockWithAttestation - - for batchStart := 0; batchStart < len(rootsToFetch); batchStart += maxBlocksPerRequest { - batchEnd := batchStart + maxBlocksPerRequest - if batchEnd > len(rootsToFetch) { - batchEnd = len(rootsToFetch) - } - batch := rootsToFetch[batchStart:batchEnd] - - blocks, err := reqresp.RequestBlocksByRoot(ctx, n.Host.P2P, pid, batch) - if err != nil { - n.log.Warn("batch blocks_by_root failed", - "peer_id", pid.String(), - "batch_size", len(batch), - "err", err, - ) - n.recordSyncFailure(pid) - break - } - allBlocks = append(allBlocks, blocks...) - } - - if len(allBlocks) == 0 { - return false - } - - // Phase 3: Process in forward order (oldest first). + // Process in forward order (oldest first). Blocks were already fetched + // during the walk — no re-fetch needed. synced := 0 - total := len(allBlocks) - for i := len(allBlocks) - 1; i >= 0; i-- { - sb := allBlocks[i] + total := len(pending) + for i := len(pending) - 1; i >= 0; i-- { + sb := pending[i] blockRoot, _ := sb.Message.Block.HashTreeRoot() if err := n.FC.ProcessBlock(sb); err != nil { n.log.Debug("sync block rejected", @@ -259,15 +241,13 @@ func (n *Node) isRootPending(root [32]byte) bool { return ok } -func (n *Node) markRootsPending(roots [][32]byte) { +func (n *Node) markRootPending(root [32]byte) { n.pendingRootsMu.Lock() defer n.pendingRootsMu.Unlock() if n.pendingRoots == nil { n.pendingRoots = make(map[[32]byte]struct{}) } - for _, root := range roots { - n.pendingRoots[root] = struct{}{} - } + n.pendingRoots[root] = struct{}{} } func (n *Node) clearPendingRoots(roots [][32]byte) { @@ -278,6 +258,36 @@ func (n *Node) clearPendingRoots(roots [][32]byte) { } } +// --- Per-peer concurrency limiting --- + +// acquirePeerSlot checks if the peer has capacity for another in-flight +// request. Returns true if a slot was acquired, false if the peer is at +// maxConcurrentRequestsPerPeer. Matches leanSpec MAX_CONCURRENT_REQUESTS. +func (n *Node) acquirePeerSlot(pid peer.ID) bool { + n.peerInFlightMu.Lock() + defer n.peerInFlightMu.Unlock() + if n.peerInFlight == nil { + n.peerInFlight = make(map[peer.ID]int) + } + if n.peerInFlight[pid] >= maxConcurrentRequestsPerPeer { + return false + } + n.peerInFlight[pid]++ + return true +} + +func (n *Node) releasePeerSlot(pid peer.ID) { + n.peerInFlightMu.Lock() + defer n.peerInFlightMu.Unlock() + if n.peerInFlight == nil { + return + } + n.peerInFlight[pid]-- + if n.peerInFlight[pid] <= 0 { + delete(n.peerInFlight, pid) + } +} + // --- Per-peer exponential backoff --- // canSyncWithPeer checks if enough time has passed since the last failure diff --git a/storage/bolt/bolt.go b/storage/bolt/bolt.go index 2abaad0..7e5f4b8 100644 --- a/storage/bolt/bolt.go +++ b/storage/bolt/bolt.go @@ -62,6 +62,10 @@ func (s *Store) PutBlock(root [32]byte, block *types.Block) { s.put(blocksBucket, root[:], block) } +func (s *Store) DeleteBlock(root [32]byte) { + s.delete(blocksBucket, root[:]) +} + func (s *Store) GetSignedBlock(root [32]byte) (*types.SignedBlockWithAttestation, bool) { var sb types.SignedBlockWithAttestation found := s.get(signedBlockBucket, root[:], &sb) @@ -75,6 +79,10 @@ func (s *Store) PutSignedBlock(root [32]byte, sb *types.SignedBlockWithAttestati s.put(signedBlockBucket, root[:], sb) } +func (s *Store) DeleteSignedBlock(root [32]byte) { + s.delete(signedBlockBucket, root[:]) +} + func (s *Store) GetState(root [32]byte) (*types.State, bool) { var st types.State found := s.get(statesBucket, root[:], &st) @@ -88,6 +96,10 @@ func (s *Store) PutState(root [32]byte, state *types.State) { s.put(statesBucket, root[:], state) } +func (s *Store) DeleteState(root [32]byte) { + s.delete(statesBucket, root[:]) +} + func (s *Store) GetAllBlocks() map[[32]byte]*types.Block { result := make(map[[32]byte]*types.Block) s.db.View(func(tx *bolt.Tx) error { @@ -124,6 +136,24 @@ func (s *Store) GetAllStates() map[[32]byte]*types.State { return result } +func (s *Store) ForEachBlock(fn func(root [32]byte, block *types.Block) bool) { + s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(blocksBucket) + return b.ForEach(func(k, v []byte) error { + var blk types.Block + if err := blk.UnmarshalSSZ(v); err != nil { + return nil // skip corrupt entries + } + var key [32]byte + copy(key[:], k) + if !fn(key, &blk) { + return fmt.Errorf("stop") // break iteration + } + return nil + }) + }) +} + // --- SSZ helpers --- type sszMarshaler interface { @@ -147,6 +177,15 @@ func (s *Store) put(bucket, key []byte, val sszMarshaler) { } } +func (s *Store) delete(bucket, key []byte) { + err := s.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bucket).Delete(key) + }) + if err != nil { + log.Printf("bolt: delete from %s: %v", bucket, err) + } +} + func (s *Store) get(bucket, key []byte, dst sszUnmarshaler) bool { var found bool s.db.View(func(tx *bolt.Tx) error { diff --git a/storage/interface.go b/storage/interface.go index fbf50a2..c563a5c 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -6,10 +6,16 @@ import "github.com/geanlabs/gean/types" type Store interface { GetBlock(root [32]byte) (*types.Block, bool) PutBlock(root [32]byte, block *types.Block) + DeleteBlock(root [32]byte) GetSignedBlock(root [32]byte) (*types.SignedBlockWithAttestation, bool) PutSignedBlock(root [32]byte, sb *types.SignedBlockWithAttestation) + DeleteSignedBlock(root [32]byte) GetState(root [32]byte) (*types.State, bool) PutState(root [32]byte, state *types.State) + DeleteState(root [32]byte) GetAllBlocks() map[[32]byte]*types.Block GetAllStates() map[[32]byte]*types.State + // ForEachBlock iterates over all blocks without copying the full map. + // Return false from fn to stop iteration early. + ForEachBlock(fn func(root [32]byte, block *types.Block) bool) } diff --git a/storage/memory/memory.go b/storage/memory/memory.go index d3d17a7..121762a 100644 --- a/storage/memory/memory.go +++ b/storage/memory/memory.go @@ -36,6 +36,12 @@ func (m *Store) PutBlock(root [32]byte, block *types.Block) { m.blocks[root] = block } +func (m *Store) DeleteBlock(root [32]byte) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.blocks, root) +} + func (m *Store) GetSignedBlock(root [32]byte) (*types.SignedBlockWithAttestation, bool) { m.mu.RLock() defer m.mu.RUnlock() @@ -49,6 +55,12 @@ func (m *Store) PutSignedBlock(root [32]byte, sb *types.SignedBlockWithAttestati m.signedBlocks[root] = sb } +func (m *Store) DeleteSignedBlock(root [32]byte) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.signedBlocks, root) +} + func (m *Store) GetState(root [32]byte) (*types.State, bool) { m.mu.RLock() defer m.mu.RUnlock() @@ -62,6 +74,12 @@ func (m *Store) PutState(root [32]byte, state *types.State) { m.states[root] = state } +func (m *Store) DeleteState(root [32]byte) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.states, root) +} + func (m *Store) GetAllBlocks() map[[32]byte]*types.Block { m.mu.RLock() defer m.mu.RUnlock() @@ -72,6 +90,16 @@ func (m *Store) GetAllBlocks() map[[32]byte]*types.Block { return cp } +func (m *Store) ForEachBlock(fn func(root [32]byte, block *types.Block) bool) { + m.mu.RLock() + defer m.mu.RUnlock() + for root, block := range m.blocks { + if !fn(root, block) { + return + } + } +} + func (m *Store) GetAllStates() map[[32]byte]*types.State { m.mu.RLock() defer m.mu.RUnlock()