From 65b90abdda93d714da6652440b158c8d5d1fae18 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Sat, 30 Mar 2024 22:54:11 +0800 Subject: [PATCH] Maximize Peer Capacity When Syncing (#13820) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * maximize it * fix it * lint * add test * Update beacon-chain/sync/initial-sync/blocks_fetcher.go Co-authored-by: Radosław Kapka * logs * kasey's review * kasey's review --------- Co-authored-by: Radosław Kapka --- .../sync/initial-sync/blocks_fetcher.go | 65 ++++++++++++++++--- .../sync/initial-sync/blocks_fetcher_test.go | 20 ++++++ .../sync/initial-sync/blocks_fetcher_utils.go | 4 +- 3 files changed, 79 insertions(+), 10 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 0390a92438c4..429fa833ec3d 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -312,7 +312,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot response.bwb, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers) if response.err == nil { - bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid) + bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid, peers) if err != nil { response.err = err } @@ -336,6 +336,11 @@ func (f *blocksFetcher) fetchBlocksFromPeer( Count: count, Step: 1, } + bestPeers := f.hasSufficientBandwidth(peers, req.Count) + // We append the best peers to the front so that higher capacity + // peers are dialed first. + peers = append(bestPeers, peers...) + peers = dedupPeers(peers) for i := 0; i < len(peers); i++ { p := peers[i] blocks, err := f.requestBlocks(ctx, req, p) @@ -472,7 +477,7 @@ func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) e } // fetchBlobsFromPeer fetches blocks from a single randomly selected peer. -func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID) ([]blocks2.BlockWithROBlobs, error) { +func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID, peers []peer.ID) ([]blocks2.BlockWithROBlobs, error) { ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer") defer span.End() if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch { @@ -487,13 +492,30 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl if req == nil { return bwb, nil } - // Request blobs from the same peer that gave us the blob batch. - blobs, err := f.requestBlobs(ctx, req, pid) - if err != nil { - return nil, errors.Wrap(err, "could not request blobs by range") + peers = f.filterPeers(ctx, peers, peersPercentagePerRequest) + // We dial the initial peer first to ensure that we get the desired set of blobs. + wantedPeers := append([]peer.ID{pid}, peers...) + bestPeers := f.hasSufficientBandwidth(wantedPeers, req.Count) + // We append the best peers to the front so that higher capacity + // peers are dialed first. If all of them fail, we fallback to the + // initial peer we wanted to request blobs from. + peers = append(bestPeers, pid) + for i := 0; i < len(peers); i++ { + p := peers[i] + blobs, err := f.requestBlobs(ctx, req, p) + if err != nil { + log.WithField("peer", p).WithError(err).Debug("Could not request blobs by range from peer") + continue + } + f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p) + robs, err := verifyAndPopulateBlobs(bwb, blobs, blobWindowStart) + if err != nil { + log.WithField("peer", p).WithError(err).Debug("Invalid BeaconBlobsByRange response") + continue + } + return robs, err } - f.p2p.Peers().Scorers().BlockProviderScorer().Touch(pid) - return verifyAndPopulateBlobs(bwb, blobs, blobWindowStart) + return nil, errNoPeersAvailable } // requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams. @@ -606,6 +628,18 @@ func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error { return nil } +func (f *blocksFetcher) hasSufficientBandwidth(peers []peer.ID, count uint64) []peer.ID { + filteredPeers := []peer.ID{} + for _, p := range peers { + if uint64(f.rateLimiter.Remaining(p.String())) < count { + continue + } + copiedP := p + filteredPeers = append(filteredPeers, copiedP) + } + return filteredPeers +} + // Determine how long it will take for us to have the required number of blocks allowed by our rate limiter. // We do this by calculating the duration till the rate limiter can request these blocks without exceeding // the provided bandwidth limits per peer. @@ -626,3 +660,18 @@ func timeToWait(wanted, rem, capacity int64, timeTillEmpty time.Duration) time.D expectedTime := int64(timeTillEmpty) * blocksNeeded / currentNumBlks return time.Duration(expectedTime) } + +// deduplicates the provided peer list. +func dedupPeers(peers []peer.ID) []peer.ID { + newPeerList := make([]peer.ID, 0, len(peers)) + peerExists := make(map[peer.ID]bool) + + for i := range peers { + if peerExists[peers[i]] { + continue + } + newPeerList = append(newPeerList, peers[i]) + peerExists[peers[i]] = true + } + return newPeerList +} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 43e062127b5c..0852d8f7f5c5 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -12,6 +12,7 @@ import ( libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing" p2pm "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" @@ -1166,3 +1167,22 @@ func TestBatchLimit(t *testing.T) { assert.Equal(t, params.BeaconConfig().MaxRequestBlocksDeneb, uint64(maxBatchLimit())) } + +func TestBlockFetcher_HasSufficientBandwidth(t *testing.T) { + bf := newBlocksFetcher(context.Background(), &blocksFetcherConfig{}) + currCap := bf.rateLimiter.Capacity() + wantedAmt := currCap - 100 + bf.rateLimiter.Add(peer.ID("a").String(), wantedAmt) + bf.rateLimiter.Add(peer.ID("c").String(), wantedAmt) + bf.rateLimiter.Add(peer.ID("f").String(), wantedAmt) + bf.rateLimiter.Add(peer.ID("d").String(), wantedAmt) + + receivedPeers := bf.hasSufficientBandwidth([]peer.ID{"a", "b", "c", "d", "e", "f"}, 110) + for _, p := range receivedPeers { + switch p { + case "a", "c", "f", "d": + t.Errorf("peer has exceeded capacity: %s", p) + } + } + assert.Equal(t, 2, len(receivedPeers)) +} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index 5610a7ed0adc..dcdaf161ab94 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -280,7 +280,7 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot } // We need to fetch the blobs for the given alt-chain if any exist, so that we can try to verify and import // the blocks. - bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid) + bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid}) if err != nil { return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer") } @@ -302,7 +302,7 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa if err != nil { return nil, errors.Wrap(err, "received invalid blocks in findAncestor") } - bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid) + bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid}) if err != nil { return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor") }