Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch data columns from multiple peers instead of just supernodes #14977

Open
wants to merge 27 commits into
base: peerDAS
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5d563b3
Extract the block fetcher's peer selection logic for data columns so …
niran Feb 20, 2025
578995f
Refactor data column sidecar request to send requests to multiple pee…
niran Feb 21, 2025
6ef2afd
Remove comment
niran Feb 21, 2025
f3dc3e2
Remove unused method
niran Feb 24, 2025
b9c7cee
Add tests for dmissiblePeersForDataColumns
niran Feb 26, 2025
f0cbf13
Extract data column fetching into standalone functions
niran Feb 27, 2025
4ec748e
Remove AdmissibleCustodyGroupsPeers and replace the final call with r…
niran Feb 28, 2025
e7eb089
Apply suggestions from code review
niran Feb 28, 2025
03e4651
Wrap errors
niran Feb 28, 2025
b40fc01
Use cached peedas.Info and properly convert custody groups to custody…
niran Feb 28, 2025
c36b3e6
Rename filterPeersForRangeReq
niran Feb 28, 2025
883cdc1
Preserve debugging descriptions when filtering out peers
niran Feb 28, 2025
ea7226a
Remove unused functions.
nalepae Mar 3, 2025
de9a18e
Initialize nested maps
niran Mar 3, 2025
9940a49
Fix comment
niran Mar 3, 2025
875603a
First pass at retry logic for data column requests
niran Mar 4, 2025
e119b13
Select fresh peers for each retry
niran Mar 4, 2025
a204514
Return an error if there are requested columns remaining
niran Mar 4, 2025
750d852
Adjust errors
niran Mar 4, 2025
a0ca743
Improve slightly the godoc.
nalepae Mar 3, 2025
b8f2cb2
Improve wrapped error messages.
nalepae Mar 3, 2025
ea253d0
`AdmissiblePeersForDataColumns`: Use value or `range`.
nalepae Mar 3, 2025
cd1908b
Remove `convertCustodyGroupsToDataColumnsByPeer` since used only once.
nalepae Mar 3, 2025
45d4774
Minor fixes.
nalepae Mar 4, 2025
8a1e4d2
Retry until we run out of peers
niran Mar 4, 2025
94720ea
Delete from the map of peers instead of filtering
niran Mar 5, 2025
5606637
Remove unneeded break
niran Mar 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 157 additions & 77 deletions beacon-chain/p2p/custody.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package p2p

import (
"fmt"
"slices"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -9,83 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/params"
)

// AdmissibleCustodyGroupsPeers returns a list of peers that custody a super set of the local node's custody groups.
func (s *Service) AdmissibleCustodyGroupsPeers(peers []peer.ID) ([]peer.ID, error) {
localCustodyGroupCount := peerdas.CustodyGroupCount()
return s.custodyGroupsAdmissiblePeers(peers, localCustodyGroupCount)
}

// AdmissibleCustodySamplingPeers returns a list of peers that custody a super set of the local node's sampling columns.
func (s *Service) AdmissibleCustodySamplingPeers(peers []peer.ID) ([]peer.ID, error) {
localSubnetSamplingSize := peerdas.CustodyGroupSamplingSize()
return s.custodyGroupsAdmissiblePeers(peers, localSubnetSamplingSize)
}

// custodyGroupsAdmissiblePeers filters out `peers` that do not custody a super set of our own custody groups.
func (s *Service) custodyGroupsAdmissiblePeers(peers []peer.ID, custodyGroupCount uint64) ([]peer.ID, error) {
// Get the total number of custody groups.
numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups

// Retrieve the local node ID.
localNodeId := s.NodeID()

// Retrieve the local node info.
localNodeInfo, _, err := peerdas.Info(localNodeId, custodyGroupCount)
if err != nil {
return nil, errors.Wrap(err, "peer info")
}

// Retrieve the needed custody groups.
neededCustodyGroups := localNodeInfo.CustodyGroups

// Find the valid peers.
validPeers := make([]peer.ID, 0, len(peers))

loop:
for _, pid := range peers {
// Get the custody group count of the remote peer.
remoteCustodyGroupCount := s.CustodyGroupCountFromPeer(pid)

// If the remote peer custodies less groups than we do, skip it.
if remoteCustodyGroupCount < custodyGroupCount {
continue
}

// Get the remote node ID from the peer ID.
remoteNodeID, err := ConvertPeerIDToNodeID(pid)
if err != nil {
return nil, errors.Wrap(err, "convert peer ID to node ID")
}

// Retrieve the remote peer info.
remotePeerInfo, _, err := peerdas.Info(remoteNodeID, remoteCustodyGroupCount)
if err != nil {
return nil, errors.Wrap(err, "peer info")
}

// Retrieve the custody groups of the remote peer.
remoteCustodyGroups := remotePeerInfo.CustodyGroups
remoteCustodyGroupsCount := uint64(len(remoteCustodyGroups))

// If the remote peers custodies all the possible columns, add it to the list.
if remoteCustodyGroupsCount == numberOfCustodyGroups {
validPeers = append(validPeers, pid)
continue
}

// Filter out invalid peers.
for custodyGroup := range neededCustodyGroups {
if !remoteCustodyGroups[custodyGroup] {
continue loop
}
}

// Add valid peer to list
validPeers = append(validPeers, pid)
}

return validPeers, nil
}
var _ DataColumnsHandler = (*Service)(nil)

// custodyGroupCountFromPeerENR retrieves the custody count from the peer ENR.
// If the ENR is not available, it defaults to the minimum number of custody groups
Expand Down Expand Up @@ -150,3 +77,156 @@ func (s *Service) CustodyGroupCountFromPeer(pid peer.ID) uint64 {

return custodyCount
}

// AdmissiblePeersForCustodyGroup returns a map of peers that:
// - custody at least one custody group listed in `neededCustodyGroups`,
//
// It returns:
// - A map, where the key of the map is the peer, the value is the custody groups of the peer.
// - A map, where the key of the map is the custody group, the value is a list of peers that custody the group.
// - A slice of descriptions for non admissible peers.
// - An error if any.
//
// NOTE: distributeSamplesToPeer from the DataColumnSampler implements similar logic,
// but with only one column queried in each request.
func (s *Service) AdmissiblePeersForDataColumns(
peers []peer.ID,
neededDataColumns map[uint64]bool,
) (map[peer.ID]map[uint64]bool, map[uint64][]peer.ID, []string, error) {
peerCount := len(peers)
neededDataColumnsCount := uint64(len(neededDataColumns))

// Create description slice for non admissible peers.
descriptions := make([]string, 0, peerCount)

// Compute custody columns for each peer.
dataColumnsByPeer, err := s.custodyColumnsFromPeers(peers)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "couldn't get custody columns from peers")
}

// Filter peers which custody at least one needed data column.
dataColumnsByAdmissiblePeer, localDescriptions := filterPeerWhichCustodyAtLeastOneDataColumn(neededDataColumns, dataColumnsByPeer)
descriptions = append(descriptions, localDescriptions...)

// Compute a map from needed data columns to their peers.
admissiblePeersByDataColumn := make(map[uint64][]peer.ID, neededDataColumnsCount)
for peer := range dataColumnsByAdmissiblePeer {
for dataColumn := range neededDataColumns {
if dataColumnsByAdmissiblePeer[peer][dataColumn] {
admissiblePeersByDataColumn[dataColumn] = append(admissiblePeersByDataColumn[dataColumn], peer)
}
}
}

return dataColumnsByAdmissiblePeer, admissiblePeersByDataColumn, descriptions, nil
}

// custodyGroupsFromPeer computes all the custody groups indexed by peer.
func (s *Service) custodyGroupsFromPeers(peers []peer.ID) (map[peer.ID]map[uint64]bool, error) {
peerCount := len(peers)

custodyGroupsByPeer := make(map[peer.ID]map[uint64]bool, peerCount)
for _, peer := range peers {
// Get the node ID from the peer ID.
nodeID, err := ConvertPeerIDToNodeID(peer)
if err != nil {
return nil, errors.Wrap(err, "convert peer ID to node ID")
}

// Get the custody group count of the peer.
custodyGroupCount := s.CustodyGroupCountFromPeer(peer)

// Get the custody groups of the peer.
dasInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
return nil, errors.Wrap(err, "custody groups")
}

custodyGroupsByPeer[peer] = dasInfo.CustodyGroups
}

return custodyGroupsByPeer, nil
}

func (s *Service) custodyColumnsFromPeers(peers []peer.ID) (map[peer.ID]map[uint64]bool, error) {
custodyGroupsByPeer, err := s.custodyGroupsFromPeers(peers)
if err != nil {
return nil, errors.Wrap(err, "custody groups from peer")
}

return convertCustodyGroupsToDataColumnsByPeer(custodyGroupsByPeer)
}

func convertCustodyGroupsToDataColumnsByPeer(custodyGroupsByPeer map[peer.ID]map[uint64]bool) (map[peer.ID]map[uint64]bool, error) {
dataColumnsByPeer := make(map[peer.ID]map[uint64]bool, len(custodyGroupsByPeer))
for peer, custodyGroups := range custodyGroupsByPeer {
custodyColumns, err := peerdas.CustodyColumns(custodyGroups)
if err != nil {
return nil, errors.Wrap(err, "couldn't get custody columns from groups")
}

dataColumnsByPeer[peer] = custodyColumns
}

return dataColumnsByPeer, nil
}

// `filterPeerWhichCustodyAtLeastOneDataColumn` filters peers which custody at least one data column
// specified in `neededDataColumns`. It returns also a list of descriptions for non admissible peers.
func filterPeerWhichCustodyAtLeastOneDataColumn(
neededDataColumns map[uint64]bool,
inputDataColumnsByPeer map[peer.ID]map[uint64]bool,
) (map[peer.ID]map[uint64]bool, []string) {
// Get the count of needed data columns.
neededDataColumnsCount := uint64(len(neededDataColumns))

// Create pretty needed data columns for logs.
var neededDataColumnsLog interface{} = "all"
numberOfColumns := params.BeaconConfig().NumberOfColumns

if neededDataColumnsCount < numberOfColumns {
neededDataColumnsLog = uint64MapToSortedSlice(neededDataColumns)
}

outputDataColumnsByPeer := make(map[peer.ID]map[uint64]bool, len(inputDataColumnsByPeer))
descriptions := make([]string, 0)

outerLoop:
for peer, peerCustodyDataColumns := range inputDataColumnsByPeer {
for neededDataColumn := range neededDataColumns {
if peerCustodyDataColumns[neededDataColumn] {
outputDataColumnsByPeer[peer] = peerCustodyDataColumns

continue outerLoop
}
}

peerCustodyColumnsCount := uint64(len(peerCustodyDataColumns))
var peerCustodyColumnsLog interface{} = "all"

if peerCustodyColumnsCount < numberOfColumns {
peerCustodyColumnsLog = uint64MapToSortedSlice(peerCustodyDataColumns)
}

description := fmt.Sprintf(
"peer %s: does not custody any needed column, custody columns: %v, needed columns: %v",
peer, peerCustodyColumnsLog, neededDataColumnsLog,
)

descriptions = append(descriptions, description)
}

return outputDataColumnsByPeer, descriptions
}

// uint64MapToSortedSlice produces a sorted uint64 slice from a map.
func uint64MapToSortedSlice(input map[uint64]bool) []uint64 {
output := make([]uint64, 0, len(input))
for idx := range input {
output = append(output, idx)
}

slices.Sort[[]uint64](output)
return output
}
Loading