Skip to content

Commit

Permalink
Maximize Peer Capacity When Syncing (#13820)
Browse files Browse the repository at this point in the history
* maximize it

* fix it

* lint

* add test

* Update beacon-chain/sync/initial-sync/blocks_fetcher.go

Co-authored-by: Radosław Kapka <[email protected]>

* logs

* kasey's review

* kasey's review

---------

Co-authored-by: Radosław Kapka <[email protected]>
  • Loading branch information
nisdas and rkapka authored Mar 30, 2024
1 parent f3b49d4 commit 65b90ab
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 10 deletions.
65 changes: 57 additions & 8 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot

response.bwb, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
if response.err == nil {
bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid)
bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid, peers)
if err != nil {
response.err = err
}
Expand All @@ -336,6 +336,11 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
Count: count,
Step: 1,
}
bestPeers := f.hasSufficientBandwidth(peers, req.Count)
// We append the best peers to the front so that higher capacity
// peers are dialed first.
peers = append(bestPeers, peers...)
peers = dedupPeers(peers)
for i := 0; i < len(peers); i++ {
p := peers[i]
blocks, err := f.requestBlocks(ctx, req, p)
Expand Down Expand Up @@ -472,7 +477,7 @@ func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) e
}

// fetchBlobsFromPeer fetches blocks from a single randomly selected peer.
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID) ([]blocks2.BlockWithROBlobs, error) {
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID, peers []peer.ID) ([]blocks2.BlockWithROBlobs, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer")
defer span.End()
if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch {
Expand All @@ -487,13 +492,30 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl
if req == nil {
return bwb, nil
}
// Request blobs from the same peer that gave us the blob batch.
blobs, err := f.requestBlobs(ctx, req, pid)
if err != nil {
return nil, errors.Wrap(err, "could not request blobs by range")
peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)
// We dial the initial peer first to ensure that we get the desired set of blobs.
wantedPeers := append([]peer.ID{pid}, peers...)
bestPeers := f.hasSufficientBandwidth(wantedPeers, req.Count)
// We append the best peers to the front so that higher capacity
// peers are dialed first. If all of them fail, we fallback to the
// initial peer we wanted to request blobs from.
peers = append(bestPeers, pid)
for i := 0; i < len(peers); i++ {
p := peers[i]
blobs, err := f.requestBlobs(ctx, req, p)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("Could not request blobs by range from peer")
continue
}
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p)
robs, err := verifyAndPopulateBlobs(bwb, blobs, blobWindowStart)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("Invalid BeaconBlobsByRange response")
continue
}
return robs, err
}
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(pid)
return verifyAndPopulateBlobs(bwb, blobs, blobWindowStart)
return nil, errNoPeersAvailable
}

// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
Expand Down Expand Up @@ -606,6 +628,18 @@ func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error {
return nil
}

func (f *blocksFetcher) hasSufficientBandwidth(peers []peer.ID, count uint64) []peer.ID {
filteredPeers := []peer.ID{}
for _, p := range peers {
if uint64(f.rateLimiter.Remaining(p.String())) < count {
continue
}
copiedP := p
filteredPeers = append(filteredPeers, copiedP)
}
return filteredPeers
}

// Determine how long it will take for us to have the required number of blocks allowed by our rate limiter.
// We do this by calculating the duration till the rate limiter can request these blocks without exceeding
// the provided bandwidth limits per peer.
Expand All @@ -626,3 +660,18 @@ func timeToWait(wanted, rem, capacity int64, timeTillEmpty time.Duration) time.D
expectedTime := int64(timeTillEmpty) * blocksNeeded / currentNumBlks
return time.Duration(expectedTime)
}

// deduplicates the provided peer list.
func dedupPeers(peers []peer.ID) []peer.ID {
newPeerList := make([]peer.ID, 0, len(peers))
peerExists := make(map[peer.ID]bool)

for i := range peers {
if peerExists[peers[i]] {
continue
}
newPeerList = append(newPeerList, peers[i])
peerExists[peers[i]] = true
}
return newPeerList
}
20 changes: 20 additions & 0 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
p2pm "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
Expand Down Expand Up @@ -1166,3 +1167,22 @@ func TestBatchLimit(t *testing.T) {

assert.Equal(t, params.BeaconConfig().MaxRequestBlocksDeneb, uint64(maxBatchLimit()))
}

func TestBlockFetcher_HasSufficientBandwidth(t *testing.T) {
bf := newBlocksFetcher(context.Background(), &blocksFetcherConfig{})
currCap := bf.rateLimiter.Capacity()
wantedAmt := currCap - 100
bf.rateLimiter.Add(peer.ID("a").String(), wantedAmt)
bf.rateLimiter.Add(peer.ID("c").String(), wantedAmt)
bf.rateLimiter.Add(peer.ID("f").String(), wantedAmt)
bf.rateLimiter.Add(peer.ID("d").String(), wantedAmt)

receivedPeers := bf.hasSufficientBandwidth([]peer.ID{"a", "b", "c", "d", "e", "f"}, 110)
for _, p := range receivedPeers {
switch p {
case "a", "c", "f", "d":
t.Errorf("peer has exceeded capacity: %s", p)
}
}
assert.Equal(t, 2, len(receivedPeers))
}
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/blocks_fetcher_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot
}
// We need to fetch the blobs for the given alt-chain if any exist, so that we can try to verify and import
// the blocks.
bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid)
bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer")
}
Expand All @@ -302,7 +302,7 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa
if err != nil {
return nil, errors.Wrap(err, "received invalid blocks in findAncestor")
}
bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid)
bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor")
}
Expand Down

0 comments on commit 65b90ab

Please sign in to comment.