Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion chain/forkchoice/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func (c *Store) processAttestationLocked(sa *types.SignedAttestation, isFromBloc
if !ok || existing == nil || existing.Message == nil || existing.Message.Slot < data.Slot {
c.latestNewAttestations[validatorID] = sa
}
c.storeGossipSignatureLocked(sa)
if c.isAggregator {
c.storeGossipSignatureLocked(sa)
}
}

metrics.AttestationsValid.WithLabelValues(sourceLabel).Inc()
Expand Down
1 change: 1 addition & 0 deletions chain/forkchoice/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (c *Store) ProcessBlock(envelope *types.SignedBlockWithAttestation) error {
c.latestFinalized = state.LatestFinalized
metrics.FinalizationsTotal.WithLabelValues("success").Inc()
metrics.LatestFinalizedSlot.Set(float64(state.LatestFinalized.Slot))
c.pruneOnFinalization()
}

// Step 2: Process body attestations as on-chain votes.
Expand Down
226 changes: 226 additions & 0 deletions chain/forkchoice/prune.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package forkchoice

import (
"github.com/geanlabs/gean/types"
)

// Storage retention limits aligned with ethlambda (store.rs:83-92).
const (
// blocksToKeep is ~1 day of block history at 4-second slots (86400/4).
blocksToKeep = 21_600

Check failure on line 10 in chain/forkchoice/prune.go

View workflow job for this annotation

GitHub Actions / Go Lint & Format

const blocksToKeep is unused (U1000)

// statesToKeep is ~3.3 hours of state history at 4-second slots (12000/4).
statesToKeep = 3_000

// maxKnownAggregatedPayloads caps the known aggregated payloads map to
// prevent unbounded growth during stalled finalization. Matches
// ethlambda's AGGREGATED_PAYLOAD_CAP.
maxKnownAggregatedPayloads = 4096

// maxAggregatedPayloadKeys caps the aggregatedPayloads proof cache to
// prevent unbounded key growth.
maxAggregatedPayloadKeys = 8192

// periodicPruningInterval is the number of slots between periodic
// pruning passes. Acts as a safety net when finalization stalls.
// Matches zeam FORKCHOICE_PRUNING_INTERVAL_SLOTS (constants.zig:22).
periodicPruningInterval = 7200

// periodicPruningLagThreshold defines how far finalization must lag
// behind the current slot before periodic pruning kicks in. Prevents
// unnecessary pruning work when finalization is healthy.
// Matches zeam's guard: finalized.slot + 2*7200 < current_slot.
periodicPruningLagThreshold = 2 * periodicPruningInterval
)

// pruneOnFinalization removes data that can no longer influence fork choice
// after finalization advances. Matches leanSpec prune_stale_attestation_data()
// (store.py:228-268).
//
// Must be called with c.mu held.
func (c *Store) pruneOnFinalization() {
finalizedSlot := c.latestFinalized.Slot

c.pruneStaleAttestationData(finalizedSlot)
c.pruneAggregatedPayloadsCache(finalizedSlot)
c.pruneStorage(finalizedSlot)
}

// pruneStaleAttestationData removes aggregated payload entries where the
// attestation target slot is at or before the finalized slot. Matches
// leanSpec store.py:245-268 which filters by target.slot > finalized_slot.
func (c *Store) pruneStaleAttestationData(finalizedSlot uint64) {
for key, payload := range c.latestKnownAggregatedPayloads {
if payload.data != nil && payload.data.Target != nil && payload.data.Target.Slot <= finalizedSlot {
delete(c.latestKnownAggregatedPayloads, key)
}
}
for key, payload := range c.latestNewAggregatedPayloads {
if payload.data != nil && payload.data.Target != nil && payload.data.Target.Slot <= finalizedSlot {
delete(c.latestNewAggregatedPayloads, key)
}
}
}

// pruneAggregatedPayloadsCache removes signature cache entries for
// attestation data at or before the finalized slot.
func (c *Store) pruneAggregatedPayloadsCache(finalizedSlot uint64) {
for key, entries := range c.aggregatedPayloads {
if len(entries) > 0 && entries[0].slot <= finalizedSlot {
delete(c.aggregatedPayloads, key)
}
}
for key, stored := range c.gossipSignatures {
if stored.slot <= finalizedSlot {
delete(c.gossipSignatures, key)
}
}
}

// pruneStorage removes blocks and states that are below the finalized slot
// and not on the canonical chain. Uses retention limits for blocks and states.
//
// Uses ForEachBlock to iterate without copying the full block map.
func (c *Store) pruneStorage(finalizedSlot uint64) {
if finalizedSlot == 0 {
return
}

// Collect canonical chain roots by walking from head to finalized root.
canonical := make(map[[32]byte]struct{})
current := c.head
for {
canonical[current] = struct{}{}
block, ok := c.storage.GetBlock(current)
if !ok {
break
}
if block.Slot <= finalizedSlot {
break
}
current = block.ParentRoot
}
// Always keep finalized and justified roots.
canonical[c.latestFinalized.Root] = struct{}{}
canonical[c.latestJustified.Root] = struct{}{}

// State retention: prune canonical states older than this threshold.
// Matches ethlambda STATES_TO_KEEP (~3.3 hours).
var pruneStatesBelow uint64
if finalizedSlot > statesToKeep {
pruneStatesBelow = finalizedSlot - statesToKeep
}

// Single pass: collect roots to delete. We cannot delete during
// ForEachBlock iteration (bolt doesn't allow mutation during View tx).
var deleteRoots [][32]byte
var deleteStateOnlyRoots [][32]byte

c.storage.ForEachBlock(func(root [32]byte, block *types.Block) bool {
if block.Slot >= finalizedSlot {
return true // keep: at or above finalized
}

if _, ok := canonical[root]; !ok {
// Non-canonical block below finalized: delete everything.
deleteRoots = append(deleteRoots, root)
return true
}

// Canonical block below finalized: keep block, but prune old states.
if pruneStatesBelow > 0 && block.Slot < pruneStatesBelow {
if root != c.latestFinalized.Root && root != c.latestJustified.Root {
deleteStateOnlyRoots = append(deleteStateOnlyRoots, root)
}
}
return true
})

for _, root := range deleteRoots {
c.storage.DeleteBlock(root)
c.storage.DeleteSignedBlock(root)
c.storage.DeleteState(root)
}
for _, root := range deleteStateOnlyRoots {
c.storage.DeleteState(root)
}

if len(deleteRoots) > 0 || len(deleteStateOnlyRoots) > 0 {
log.Info("pruned storage on finalization",
"finalized_slot", finalizedSlot,
"blocks_deleted", len(deleteRoots),
"states_deleted", len(deleteRoots)+len(deleteStateOnlyRoots),
)
}
}

// enforcePayloadCap evicts the oldest entries from latestKnownAggregatedPayloads
// when the map exceeds maxKnownAggregatedPayloads. This bounds memory even when
// finalization stalls. Matches ethlambda's FIFO PayloadBuffer pattern.
func (c *Store) enforcePayloadCap() {
for len(c.latestKnownAggregatedPayloads) > maxKnownAggregatedPayloads {
var oldestKey [32]byte
oldestSlot := uint64(^uint64(0))
found := false
for key, payload := range c.latestKnownAggregatedPayloads {
if payload.data != nil && payload.data.Target != nil && payload.data.Target.Slot < oldestSlot {
oldestSlot = payload.data.Target.Slot
oldestKey = key
found = true
}
}
if !found {
break
}
delete(c.latestKnownAggregatedPayloads, oldestKey)
}
}

// maybePeriodicPruneLocked runs a pruning pass every periodicPruningInterval
// slots when finalization is lagging. This is a safety net that prevents
// unbounded memory growth even if finalization stalls for an extended period.
// Matches zeam's periodic pruning pattern (chain.zig:302-326).
//
// Must be called with c.mu held.
func (c *Store) maybePeriodicPruneLocked() {
currentSlot := c.time / types.IntervalsPerSlot
if currentSlot == 0 || currentSlot%periodicPruningInterval != 0 {
return
}

finalizedSlot := c.latestFinalized.Slot
if finalizedSlot+periodicPruningLagThreshold >= currentSlot {
return // finalization is healthy, no need for periodic pruning
}

log.Warn("finalization lagging, running periodic pruning",
"current_slot", currentSlot,
"finalized_slot", finalizedSlot,
"lag", currentSlot-finalizedSlot,
)

c.pruneStaleAttestationData(finalizedSlot)
c.pruneAggregatedPayloadsCache(finalizedSlot)
c.pruneStorage(finalizedSlot)
}

// enforceAggregatedPayloadsCacheCap bounds the aggregatedPayloads proof cache
// keys to prevent unbounded growth independent of finalization.
func (c *Store) enforceAggregatedPayloadsCacheCap() {
for len(c.aggregatedPayloads) > maxAggregatedPayloadKeys {
var oldestKey signatureKey
oldestSlot := uint64(^uint64(0))
found := false
for key, entries := range c.aggregatedPayloads {
if len(entries) > 0 && entries[0].slot < oldestSlot {
oldestSlot = entries[0].slot
oldestKey = key
found = true
}
}
if !found {
break
}
delete(c.aggregatedPayloads, oldestKey)
}
}
9 changes: 5 additions & 4 deletions chain/forkchoice/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,12 @@ func (c *Store) lookupBlockSummary(root [32]byte) (blockSummary, bool) {
}

func (c *Store) allKnownBlockSummaries() map[[32]byte]blockSummary {
blocks := c.storage.GetAllBlocks()
summaries := make(map[[32]byte]blockSummary, len(blocks)+len(c.checkpointRoots))
for root, block := range blocks {
summaries := make(map[[32]byte]blockSummary, len(c.checkpointRoots))
// Iterate storage without copying the full block map.
c.storage.ForEachBlock(func(root [32]byte, block *types.Block) bool {
summaries[root] = summarizeBlock(block)
}
return true
})
for root, summary := range c.checkpointRoots {
if _, ok := summaries[root]; !ok {
summaries[root] = summary
Expand Down
10 changes: 10 additions & 0 deletions chain/forkchoice/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (c *Store) tickIntervalLocked(hasProposal bool) {
if hasProposal {
c.acceptNewAttestationsLocked()
}
// Periodic pruning safety net: prune stale data when finalization
// is lagging, even if pruneOnFinalization hasn't been triggered.
// Runs every periodicPruningInterval slots. Matches zeam's
// FORKCHOICE_PRUNING_INTERVAL_SLOTS pattern (constants.zig:22).
c.maybePeriodicPruneLocked()
case 1:
// Validator voting interval — no action.
case 2:
Expand Down Expand Up @@ -90,6 +95,11 @@ func (c *Store) acceptNewAttestationsLocked() {
c.latestKnownAttestations[id] = sa
}
c.latestNewAttestations = make(map[uint64]*types.SignedAttestation)

// Enforce caps to bound memory even when finalization stalls.
c.enforcePayloadCap()
c.enforceAggregatedPayloadsCacheCap()

metrics.LatestKnownAggregatedPayloads.Set(float64(len(c.latestKnownAggregatedPayloads)))
c.updateHeadLocked()
}
Expand Down
10 changes: 10 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
peerBackoff map[peer.ID]*peerSyncState
peerBackoffMu sync.Mutex

// Per-peer concurrency tracking. Limits in-flight sync requests to
// maxConcurrentRequestsPerPeer (2) per peer, matching leanSpec
// MAX_CONCURRENT_REQUESTS.
peerInFlight map[peer.ID]int
peerInFlightMu sync.Mutex

// Recovery cooldown prevents recoverMissingParentSync from flooding
// peers when multiple gossip blocks arrive with missing parents.
recoveryMu sync.Mutex
Expand All @@ -53,7 +59,7 @@
dbCloser io.Closer
log *slog.Logger

ctx context.Context

Check failure on line 62 in node/node.go

View workflow job for this annotation

GitHub Actions / Go Lint & Format

field ctx is unused (U1000)
cancel context.CancelFunc
}

Expand All @@ -68,7 +74,7 @@
const (
// maxBlocksPerRequest is the maximum number of block roots to request
// in a single BlocksByRoot RPC call. Matches leanSpec MAX_BLOCKS_PER_REQUEST.
maxBlocksPerRequest = 10

Check failure on line 77 in node/node.go

View workflow job for this annotation

GitHub Actions / Go Lint & Format

const maxBlocksPerRequest is unused (U1000)

// maxBackfillDepth is the maximum depth for backward chain walks.
// Matches leanSpec MAX_BACKFILL_DEPTH and zeam MAX_BLOCK_FETCH_DEPTH.
Expand All @@ -87,6 +93,10 @@

// backoffMultiplier doubles the backoff on each consecutive failure.
backoffMultiplier = 2

// maxConcurrentRequestsPerPeer limits in-flight sync requests to a
// single peer. Matches leanSpec MAX_CONCURRENT_REQUESTS.
maxConcurrentRequestsPerPeer = 2
)

func (n *Node) Close() {
Expand Down
Loading
Loading