Skip to content

Commit

Permalink
searchForPeers: Replace batchSize by batchPeriod. (#14704)
Browse files Browse the repository at this point in the history
Rationale:
Before this commit, the internal loop exited if:
- the expected amount of peers is found, or,
- the iterator returns `false` (exhaustion), or
- `batchSize` iterations are done.

The issue with the iterations count is, in case not enough
peer are found AND `iterator.Next` always returns `true`,
we don't control WHEN the loop is going to stop.

The root cause is we don't control the time needed to
run the `iterator.Next` function, which is a function of
`devp2P (geth)`.

The value of `batchSize (2000)` was chosen arbitrarily.
It turns out the time needed to run `iterator.Next` can go from a few micro seconds to a few hundreds of milliseconds.

==> In small networks (example: E2E tests), it was possible for the loop not to exit during several dozen of seconds.

With this commit, we replace the `batchSize` by a `batchPeriod`, ensuring the loop will never
run longer than `batchPeriod`, even in a small network.

Co-authored-by: Nishant Das <[email protected]>
  • Loading branch information
nalepae and nisdas authored Dec 10, 2024
1 parent 63bc965 commit 1f2d8cf
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Fixed another rollback bug due to a context deadline.
- Fix checkpoint sync bug on holesky. [pr](https://github.com/prysmaticlabs/prysm/pull/14689)
- Fix segmentation fault in E2E when light-client feature flag is enabled. [PR](https://github.com/prysmaticlabs/prysm/pull/14699)
- Fix `searchForPeers` infinite loop in small networks.


### Security
Expand Down
9 changes: 1 addition & 8 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,6 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
require.NoError(t, err)
defer bootListener.Close()

// Use smaller batch size for testing.
currentBatchSize := batchSize
batchSize = 2
defer func() {
batchSize = currentBatchSize
}()

bootNode := bootListener.Self()
subnet := uint64(5)

Expand All @@ -240,7 +233,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
// setup other nodes.
cfg = &Config{
Discv5BootStrapAddrs: []string{bootNode.String()},
MaxPeers: 30,
MaxPeers: 2,
}
// Setup 2 different hosts
for i := 1; i <= 2; i++ {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (s *Service) listenForNewNodes() {
}

// Search for new peers.
wantedNodes := searchForPeers(iterator, batchSize, missingPeerCount, s.filterPeer)
wantedNodes := searchForPeers(iterator, batchPeriod, missingPeerCount, s.filterPeer)

wg := new(sync.WaitGroup)
for i := 0; i < len(wantedNodes); i++ {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ var _ runtime.Service = (*Service)(nil)
var pollingPeriod = 6 * time.Second

// When looking for new nodes, if not enough nodes are found,
// we stop after this amount of iterations.
var batchSize = 2_000
// we stop after this spent time.
var batchPeriod = 2 * time.Second

// Refresh rate of ENR set at twice per slot.
var refreshRate = slots.DivideSlotBy(2)
Expand Down
10 changes: 6 additions & 4 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node)
// - Iterator is exhausted.
func searchForPeers(
iterator enode.Iterator,
batchSize int,
batchPeriod time.Duration,
peersToFindCount uint,
filter func(node *enode.Node) bool,
) []*enode.Node {
nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize)
for i := 0; i < batchSize && uint(len(nodeFromNodeID)) <= peersToFindCount && iterator.Next(); i++ {
nodeFromNodeID := make(map[enode.ID]*enode.Node)
start := time.Now()

for time.Since(start) < batchPeriod && uint(len(nodeFromNodeID)) < peersToFindCount && iterator.Next() {
node := iterator.Node()

// Filter out nodes that do not meet the criteria.
Expand Down Expand Up @@ -191,7 +193,7 @@ func (s *Service) FindPeersWithSubnet(
}

// Search for new peers in the network.
nodes := searchForPeers(iterator, batchSize, uint(missingPeerCountForTopic), filter)
nodes := searchForPeers(iterator, batchPeriod, uint(missingPeerCountForTopic), filter)

// Restrict dials if limit is applied.
maxConcurrentDials := math.MaxInt
Expand Down

0 comments on commit 1f2d8cf

Please sign in to comment.