Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch data columns from multiple peers instead of just supernodes #14977

Open
wants to merge 27 commits into
base: peerDAS
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5d563b3
Extract the block fetcher's peer selection logic for data columns so …
niran Feb 20, 2025
578995f
Refactor data column sidecar request to send requests to multiple pee…
niran Feb 21, 2025
6ef2afd
Remove comment
niran Feb 21, 2025
f3dc3e2
Remove unused method
niran Feb 24, 2025
b9c7cee
Add tests for dmissiblePeersForDataColumns
niran Feb 26, 2025
f0cbf13
Extract data column fetching into standalone functions
niran Feb 27, 2025
4ec748e
Remove AdmissibleCustodyGroupsPeers and replace the final call with r…
niran Feb 28, 2025
e7eb089
Apply suggestions from code review
niran Feb 28, 2025
03e4651
Wrap errors
niran Feb 28, 2025
b40fc01
Use cached peedas.Info and properly convert custody groups to custody…
niran Feb 28, 2025
c36b3e6
Rename filterPeersForRangeReq
niran Feb 28, 2025
883cdc1
Preserve debugging descriptions when filtering out peers
niran Feb 28, 2025
ea7226a
Remove unused functions.
nalepae Mar 3, 2025
de9a18e
Initialize nested maps
niran Mar 3, 2025
9940a49
Fix comment
niran Mar 3, 2025
875603a
First pass at retry logic for data column requests
niran Mar 4, 2025
e119b13
Select fresh peers for each retry
niran Mar 4, 2025
a204514
Return an error if there are requested columns remaining
niran Mar 4, 2025
750d852
Adjust errors
niran Mar 4, 2025
a0ca743
Improve slightly the godoc.
nalepae Mar 3, 2025
b8f2cb2
Improve wrapped error messages.
nalepae Mar 3, 2025
ea253d0
`AdmissiblePeersForDataColumns`: Use value or `range`.
nalepae Mar 3, 2025
cd1908b
Remove `convertCustodyGroupsToDataColumnsByPeer` since used only once.
nalepae Mar 3, 2025
45d4774
Minor fixes.
nalepae Mar 4, 2025
8a1e4d2
Retry until we run out of peers
niran Mar 4, 2025
94720ea
Delete from the map of peers instead of filtering
niran Mar 5, 2025
5606637
Remove unneeded break
niran Mar 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 205 additions & 6 deletions beacon-chain/p2p/custody.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package p2p

import (
"fmt"
"slices"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -9,18 +12,14 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/params"
)

var _ DataColumnsHandler = (*Service)(nil)

// AdmissibleCustodyGroupsPeers returns a list of peers that custody a super set of the local node's custody groups.
func (s *Service) AdmissibleCustodyGroupsPeers(peers []peer.ID) ([]peer.ID, error) {
localCustodyGroupCount := peerdas.CustodyGroupCount()
return s.custodyGroupsAdmissiblePeers(peers, localCustodyGroupCount)
}

// AdmissibleCustodySamplingPeers returns a list of peers that custody a super set of the local node's sampling columns.
func (s *Service) AdmissibleCustodySamplingPeers(peers []peer.ID) ([]peer.ID, error) {
localSubnetSamplingSize := peerdas.CustodyGroupSamplingSize()
return s.custodyGroupsAdmissiblePeers(peers, localSubnetSamplingSize)
}

// custodyGroupsAdmissiblePeers filters out `peers` that do not custody a super set of our own custody groups.
func (s *Service) custodyGroupsAdmissiblePeers(peers []peer.ID, custodyGroupCount uint64) ([]peer.ID, error) {
// Get the total number of custody groups.
Expand Down Expand Up @@ -150,3 +149,203 @@ func (s *Service) CustodyGroupCountFromPeer(pid peer.ID) uint64 {

return custodyCount
}

// AdmissiblePeersForCustodyGroup returns a map of peers that:
// - custody at least one custody group listed in `neededCustodyGroups`,
//
// It returns:
// - A map, where the key of the map is the peer, the value is the custody groups of the peer.
// - A map, where the key of the map is the custody group, the value is the peer that custodies the group.
// - A slice of descriptions for non admissible peers.
// - An error if any.
//
// NOTE: distributeSamplesToPeer from the DataColumnSampler implements similar logic,
// but with only one column queried in each request.

func (s *Service) AdmissiblePeersForDataColumns(
peers []peer.ID,
neededDataColumns map[uint64]bool,
) (map[peer.ID]map[uint64]bool, map[uint64][]peer.ID, []string, error) {
peerCount := len(peers)
neededDataColumnsCount := uint64(len(neededDataColumns))

// Create description slice for non admissible peers.
descriptions := make([]string, 0, peerCount)

// Compute custody groups for each peer.
custodyGroupsByPeer, err := s.custodyGroupsFromPeer(peers)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "custody columns from peer")
}

// Filter peers which custody at least one needed data column.
dataColumnsByAdmissiblePeer, localDescriptions := filterPeerWhichCustodyAtLeastOneDataColumn(neededDataColumns, custodyGroupsByPeer)
descriptions = append(descriptions, localDescriptions...)

// Compute a map from needed data columns to their peers.
admissiblePeersByDataColumn := make(map[uint64][]peer.ID, neededDataColumnsCount)
for peer := range dataColumnsByAdmissiblePeer {
for dataColumn := range neededDataColumns {
if dataColumnsByAdmissiblePeer[peer][dataColumn] {
admissiblePeersByDataColumn[dataColumn] = append(admissiblePeersByDataColumn[dataColumn], peer)
}
}
}

return dataColumnsByAdmissiblePeer, admissiblePeersByDataColumn, descriptions, nil
}

// SelectPeersToFetchDataColumnsFrom implements greedy algorithm in order to select peers to fetch data columns from.
// https://en.wikipedia.org/wiki/Set_cover_problem#Greedy_algorithm
func SelectPeersToFetchDataColumnsFrom(
neededDataColumns map[uint64]bool,
dataColumnsByPeer map[peer.ID]map[uint64]bool,
) (map[peer.ID][]uint64, error) {
// Copy the provided needed data columns into a set that we will remove elements from.
remainingDataColumns := make(map[uint64]bool, len(neededDataColumns))
for dataColumn := range neededDataColumns {
remainingDataColumns[dataColumn] = true
}

dataColumnsFromSelectedPeers := make(map[peer.ID][]uint64)

// Filter `dataColumnsByPeer` to only contain needed data columns.
neededDataColumnsByPeer := make(map[peer.ID]map[uint64]bool, len(dataColumnsByPeer))
for pid, dataColumns := range dataColumnsByPeer {
for dataColumn := range dataColumns {
if remainingDataColumns[dataColumn] {
if _, ok := neededDataColumnsByPeer[pid]; !ok {
neededDataColumnsByPeer[pid] = make(map[uint64]bool, len(neededDataColumns))
}

neededDataColumnsByPeer[pid][dataColumn] = true
}
}
}

for len(remainingDataColumns) > 0 {
// Check if at least one peer remains. If not, it means that we don't have enough peers to fetch all needed data columns.
if len(neededDataColumnsByPeer) == 0 {
missingDataColumnsSortedSlice := uint64MapToSortedSlice(remainingDataColumns)
return dataColumnsFromSelectedPeers, errors.Errorf("no peer to fetch the following data columns: %v", missingDataColumnsSortedSlice)
}

// Select the peer that custody the most needed data columns (greedy selection).
var bestPeer peer.ID
for peer, dataColumns := range neededDataColumnsByPeer {
if len(dataColumns) > len(neededDataColumnsByPeer[bestPeer]) {
bestPeer = peer
}
}

dataColumnsSortedSlice := uint64MapToSortedSlice(neededDataColumnsByPeer[bestPeer])
dataColumnsFromSelectedPeers[bestPeer] = dataColumnsSortedSlice

// Remove the selected peer from the list of peers.
delete(neededDataColumnsByPeer, bestPeer)

// Remove the selected peer's data columns from the list of remaining data columns.
for _, dataColumn := range dataColumnsSortedSlice {
delete(remainingDataColumns, dataColumn)
}

// Remove the selected peer's data columns from the list of needed data columns by peer.
for _, dataColumn := range dataColumnsSortedSlice {
for peer, dataColumns := range neededDataColumnsByPeer {
delete(dataColumns, dataColumn)

if len(dataColumns) == 0 {
delete(neededDataColumnsByPeer, peer)
}
}
}
}

return dataColumnsFromSelectedPeers, nil
}

// custodyGroupsFromPeer compute all the custody groups indexed by peer.
func (s *Service) custodyGroupsFromPeer(peers []peer.ID) (map[peer.ID]map[uint64]bool, error) {
peerCount := len(peers)

custodyGroupsByPeer := make(map[peer.ID]map[uint64]bool, peerCount)
for _, peer := range peers {
// Get the node ID from the peer ID.
nodeID, err := ConvertPeerIDToNodeID(peer)
if err != nil {
return nil, errors.Wrap(err, "convert peer ID to node ID")
}

// Get the custody group count of the peer.
custodyGroupCount := s.CustodyGroupCountFromPeer(peer)

// Get the custody groups of the peer.
custodyGroups, err := peerdas.CustodyGroups(nodeID, custodyGroupCount)
if err != nil {
return nil, errors.Wrap(err, "custody groups")
}

custodyGroupsByPeer[peer] = custodyGroups
}

return custodyGroupsByPeer, nil
}

// `filterPeerWhichCustodyAtLeastOneDataColumn` filters peers which custody at least one data column
// specified in `neededDataColumns`. It returns also a list of descriptions for non admissible peers.
func filterPeerWhichCustodyAtLeastOneDataColumn(
neededDataColumns map[uint64]bool,
inputDataColumnsByPeer map[peer.ID]map[uint64]bool,
) (map[peer.ID]map[uint64]bool, []string) {
// Get the count of needed data columns.
neededDataColumnsCount := uint64(len(neededDataColumns))

// Create pretty needed data columns for logs.
var neededDataColumnsLog interface{} = "all"
numberOfColumns := params.BeaconConfig().NumberOfColumns

if neededDataColumnsCount < numberOfColumns {
neededDataColumnsLog = uint64MapToSortedSlice(neededDataColumns)
}

outputDataColumnsByPeer := make(map[peer.ID]map[uint64]bool, len(inputDataColumnsByPeer))
descriptions := make([]string, 0)

outerLoop:
for peer, peerCustodyDataColumns := range inputDataColumnsByPeer {
for neededDataColumn := range neededDataColumns {
if peerCustodyDataColumns[neededDataColumn] {
outputDataColumnsByPeer[peer] = peerCustodyDataColumns

continue outerLoop
}
}

peerCustodyColumnsCount := uint64(len(peerCustodyDataColumns))
var peerCustodyColumnsLog interface{} = "all"

if peerCustodyColumnsCount < numberOfColumns {
peerCustodyColumnsLog = uint64MapToSortedSlice(peerCustodyDataColumns)
}

description := fmt.Sprintf(
"peer %s: does not custody any needed column, custody columns: %v, needed columns: %v",
peer, peerCustodyColumnsLog, neededDataColumnsLog,
)

descriptions = append(descriptions, description)
}

return outputDataColumnsByPeer, descriptions
}

// uint64MapToSortedSlice produces a sorted uint64 slice from a map.
func uint64MapToSortedSlice(input map[uint64]bool) []uint64 {
output := make([]uint64, 0, len(input))
for idx := range input {
output = append(output, idx)
}

slices.Sort[[]uint64](output)
return output
}
Loading