Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
 into hackSync
  • Loading branch information
nisdas committed Mar 5, 2025
2 parents 2c6e028 + a7b016c commit 79657b1
Show file tree
Hide file tree
Showing 42 changed files with 785 additions and 419 deletions.
1 change: 1 addition & 0 deletions api/server/structs/endpoints_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ type ForkChoiceNodeExtraData struct {
Balance string `json:"balance"`
ExecutionOptimistic bool `json:"execution_optimistic"`
TimeStamp string `json:"timestamp"`
Target string `json:"target"`
}
7 changes: 7 additions & 0 deletions beacon-chain/blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,10 @@ func WithSyncChecker(checker Checker) Option {
return nil
}
}

func WithSlasherEnabled(enabled bool) Option {
return func(s *Service) error {
s.slasherEnabled = enabled
return nil
}
}
3 changes: 1 addition & 2 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/slasher/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
Expand Down Expand Up @@ -126,7 +125,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
return err
}
// If slasher is configured, forward the attestations in the block via an event feed for processing.
if features.Get().EnableSlasher {
if s.slasherEnabled {
go s.sendBlockAttestationsToSlasher(blockCopy, preState)
}

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Service struct {
blobNotifiers *blobNotifierMap
blockBeingSynced *currentlySyncingBlock
blobStorage *filesystem.BlobStorage
slasherEnabled bool
}

// config options for the service.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type NoHeadAccessDatabase interface {
SaveLightClientBootstrap(ctx context.Context, blockRoot []byte, bootstrap interfaces.LightClientBootstrap) error

CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint primitives.Slot) error
DeleteHistoricalDataBeforeSlot(ctx context.Context, slot primitives.Slot) error
DeleteHistoricalDataBeforeSlot(ctx context.Context, slot primitives.Slot, batchSize int) (int, error)
}

// HeadAccessDatabase defines a struct with access to reading chain head data.
Expand Down
122 changes: 84 additions & 38 deletions beacon-chain/db/kv/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,77 +261,82 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error {
// - blockRootValidatorHashesBucket
// - blockSlotIndicesBucket
// - stateSlotIndicesBucket
func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot primitives.Slot) error {
func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot primitives.Slot, batchSize int) (int, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteHistoricalDataBeforeSlot")
defer span.End()

// Collect slot/root pairs to perform deletions in a separate read only transaction.
var (
roots [][]byte
slts []primitives.Slot
)
err := s.db.View(func(tx *bolt.Tx) error {
var err error
roots, slts, err = blockRootsBySlotRange(ctx, tx.Bucket(blockSlotIndicesBucket), primitives.Slot(0), cutoffSlot, nil, nil, nil)
if err != nil {
return errors.Wrap(err, "could not retrieve block roots")
}
return nil
})
slotRoots, err := s.slotRootsInRange(ctx, primitives.Slot(0), cutoffSlot, batchSize)
if err != nil {
return errors.Wrap(err, "could not retrieve block roots and slots")
return 0, err
}

// Return early if there's nothing to delete.
if len(slotRoots) == 0 {
return 0, nil
}

// Perform all deletions in a single transaction for atomicity
return s.db.Update(func(tx *bolt.Tx) error {
for _, root := range roots {
var numSlotsDeleted int
err = s.db.Update(func(tx *bolt.Tx) error {
for _, sr := range slotRoots {
// Return if context is cancelled or deadline is exceeded.
if ctx.Err() != nil {
//nolint:nilerr
return nil
}

// Delete block
if err = s.deleteBlock(tx, root); err != nil {
if err = s.deleteBlock(tx, sr.root[:]); err != nil {
return err
}

// Delete finalized block roots index
if err = tx.Bucket(finalizedBlockRootsIndexBucket).Delete(root); err != nil {
if err = tx.Bucket(finalizedBlockRootsIndexBucket).Delete(sr.root[:]); err != nil {
return errors.Wrap(err, "could not delete finalized block root index")
}

// Delete state
if err = tx.Bucket(stateBucket).Delete(root); err != nil {
if err = tx.Bucket(stateBucket).Delete(sr.root[:]); err != nil {
return errors.Wrap(err, "could not delete state")
}

// Delete state summary
if err = tx.Bucket(stateSummaryBucket).Delete(root); err != nil {
if err = tx.Bucket(stateSummaryBucket).Delete(sr.root[:]); err != nil {
return errors.Wrap(err, "could not delete state summary")
}

// Delete validator entries
if err = s.deleteValidatorHashes(tx, root); err != nil {
if err = s.deleteValidatorHashes(tx, sr.root[:]); err != nil {
return errors.Wrap(err, "could not delete validators")
}

numSlotsDeleted++
}

for _, slot := range slts {
for _, sr := range slotRoots {
// Delete slot indices
if err = tx.Bucket(blockSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil {
if err = tx.Bucket(blockSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(sr.slot)); err != nil {
return errors.Wrap(err, "could not delete block slot index")
}
if err = tx.Bucket(stateSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil {
if err = tx.Bucket(stateSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(sr.slot)); err != nil {
return errors.Wrap(err, "could not delete state slot index")
}
}

// Delete all caches after we have deleted everything from buckets.
// This is done after the buckets are deleted to avoid any issues in case of transaction rollback.
for _, root := range roots {
for _, sr := range slotRoots {
// Delete block from cache
s.blockCache.Del(string(root))
s.blockCache.Del(string(sr.root[:]))
// Delete state summary from cache
s.stateSummaryCache.delete([32]byte(root))
s.stateSummaryCache.delete(sr.root)
}

return nil
})

return numSlotsDeleted, err
}

// SaveBlock to the db.
Expand All @@ -352,7 +357,7 @@ func (s *Store) SaveBlock(ctx context.Context, signed interfaces.ReadOnlySignedB
// if a `saveBlindedBeaconBlocks` key exists in the database. Otherwise, we check if the last
// blocked stored to check if it is blinded, and then write that `saveBlindedBeaconBlocks` key
// to the DB for future checks.
func (s *Store) shouldSaveBlinded(ctx context.Context) (bool, error) {
func (s *Store) shouldSaveBlinded() (bool, error) {
var saveBlinded bool
if err := s.db.View(func(tx *bolt.Tx) error {
metadataBkt := tx.Bucket(chainMetadataBucket)
Expand Down Expand Up @@ -414,7 +419,7 @@ func prepareBlockBatch(blks []blocks.ROBlock, shouldBlind bool) ([]blockBatchEnt
}

func (s *Store) SaveROBlocks(ctx context.Context, blks []blocks.ROBlock, cache bool) error {
shouldBlind, err := s.shouldSaveBlinded(ctx)
shouldBlind, err := s.shouldSaveBlinded()
if err != nil {
return err
}
Expand Down Expand Up @@ -685,6 +690,49 @@ func (s *Store) SaveRegistrationsByValidatorIDs(ctx context.Context, ids []primi
})
}

type slotRoot struct {
slot primitives.Slot
root [32]byte
}

// slotRootsInRange returns slot and block root pairs of length min(batchSize, end-slot)
func (s *Store) slotRootsInRange(ctx context.Context, start, end primitives.Slot, batchSize int) ([]slotRoot, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.slotRootsInRange")
defer span.End()
if end < start {
return nil, errInvalidSlotRange
}

var pairs []slotRoot
key := bytesutil.SlotToBytesBigEndian(end)
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blockSlotIndicesBucket)
c := bkt.Cursor()
for k, v := c.Seek(key); k != nil; k, v = c.Prev() {
slot := bytesutil.BytesToSlotBigEndian(k)
if slot > end {
continue // Seek will seek to the next key *after* the given one if not present
}
if slot < start {
return nil
}
roots, err := splitRoots(v)
if err != nil {
return errors.Wrapf(err, "corrupt value %v in block slot index for slot=%d", v, slot)
}
for _, r := range roots {
pairs = append(pairs, slotRoot{slot: slot, root: r})
}
if len(pairs) >= batchSize {
return nil // allows code to easily cap the number of items pruned
}
}
return nil
})

return pairs, err
}

// blockRootsByFilter retrieves the block roots given the filter criteria.
func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter) ([][]byte, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.blockRootsByFilter")
Expand All @@ -705,7 +753,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter

// We retrieve block roots that match a filter criteria of slot ranges, if specified.
filtersMap := f.Filters()
rootsBySlotRange, _, err := blockRootsBySlotRange(
rootsBySlotRange, err := blockRootsBySlotRange(
ctx,
tx.Bucket(blockSlotIndicesBucket),
filtersMap[filters.StartSlot],
Expand Down Expand Up @@ -750,13 +798,13 @@ func blockRootsBySlotRange(
ctx context.Context,
bkt *bolt.Bucket,
startSlotEncoded, endSlotEncoded, startEpochEncoded, endEpochEncoded, slotStepEncoded interface{},
) ([][]byte, []primitives.Slot, error) {
) ([][]byte, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.blockRootsBySlotRange")
defer span.End()

// Return nothing when all slot parameters are missing
if startSlotEncoded == nil && endSlotEncoded == nil && startEpochEncoded == nil && endEpochEncoded == nil {
return [][]byte{}, nil, nil
return [][]byte{}, nil
}

var startSlot, endSlot primitives.Slot
Expand All @@ -777,11 +825,11 @@ func blockRootsBySlotRange(
if startEpochOk && endEpochOk {
startSlot, err = slots.EpochStart(startEpoch)
if err != nil {
return nil, nil, err
return nil, err
}
endSlot, err = slots.EpochStart(endEpoch)
if err != nil {
return nil, nil, err
return nil, err
}
endSlot = endSlot + params.BeaconConfig().SlotsPerEpoch - 1
}
Expand All @@ -792,11 +840,10 @@ func blockRootsBySlotRange(
return key != nil && bytes.Compare(key, max) <= 0
}
if endSlot < startSlot {
return nil, nil, errInvalidSlotRange
return nil, errInvalidSlotRange
}
rootsRange := endSlot.SubSlot(startSlot).Div(step)
roots := make([][]byte, 0, rootsRange)
var slts []primitives.Slot
c := bkt.Cursor()
for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() {
slot := bytesutil.BytesToSlotBigEndian(k)
Expand All @@ -811,9 +858,8 @@ func blockRootsBySlotRange(
splitRoots = append(splitRoots, v[i:i+32])
}
roots = append(roots, splitRoots...)
slts = append(slts, slot)
}
return roots, slts, nil
return roots, nil
}

// blockRootsBySlot retrieves the block roots by slot
Expand Down
Loading

0 comments on commit 79657b1

Please sign in to comment.