Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi+refactor: persistent peer manager #5700

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
252c36a
peer+server: move persistentPeers to PersistentPeerManager
ellemouton May 11, 2022
8e772db
server+peer: move persistentPeerAddrs to PersistentPeerManager
ellemouton May 11, 2022
082bc98
peer+server: move persistentConnReqs to PersistentPeerManager
ellemouton May 11, 2022
1f6da30
peer+server: move persistentPeersBackoff to PersistentPeerManager
ellemouton May 23, 2022
417991c
peer+server: move persistentRetryCancels to PersistentPeerManager
ellemouton May 23, 2022
5221aaf
peer+server: move ConnectPeer logic to PersistentPeerManager
ellemouton May 23, 2022
a5f03dc
peer+server: move all connMgr Connect calls to PersistentPeerManager
ellemouton May 24, 2022
6756734
peer+server: move cancelConnReqs to PersistentPeerManager
ellemouton May 24, 2022
194de63
peer+server: deleting the peer should always cancel conn reqs
ellemouton May 24, 2022
73c105d
peer+server: move node addr update logic to PersistentPeerManager
ellemouton May 24, 2022
b7e26cf
peer+server: add addrs on persistent peer creation
ellemouton May 24, 2022
cdcdfb6
peer+server: only add supported addresses
ellemouton May 24, 2022
e226516
peer+server: allow PersistentPeerMgr to fetch advertised addrs
ellemouton May 24, 2022
a41637e
peer+server: let PersistentPeerManager do addr conversions
ellemouton May 24, 2022
71b397d
server: remove redundant address lookups
ellemouton May 24, 2022
c11367e
peer+server: Connect with backoff to PersistentPeerManager
ellemouton May 24, 2022
b7bd6f9
peer: always cancel any previous retry cancellers
ellemouton May 24, 2022
077d9ca
docs/release-notes: update with entry for 5700
ellemouton May 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 132 additions & 5 deletions peer/persistentpeer_mgr.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,41 @@
package peer

import (
"crypto/rand"
"math/big"
"sync"
"time"

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/connmgr"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think route.Vertex needs to be introduced

)

const (
// defaultStableConnDuration is a floor under which all reconnection
// attempts will apply exponential randomized backoff. Connections
// durations exceeding this value will be eligible to have their
// backoffs reduced.
defaultStableConnDuration = 10 * time.Minute
)

// PersistentPeerMgrConfig holds the config of the PersistentPeerManager.
type PersistentPeerMgrConfig struct {
// MinBackoff is the shortest backoff when reconnecting to a persistent
// peer.
MinBackoff time.Duration

// MaxBackoff is the longest backoff when reconnecting to a persistent
// peer.
MaxBackoff time.Duration
}

// PersistentPeerManager manages persistent peers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a basic description of what is/makes a peer persistent?

type PersistentPeerManager struct {
// cfg holds the config of the manager.
cfg *PersistentPeerMgrConfig

// conns maps a peer's public key to a persistentPeer object.
conns map[route.Vertex]*persistentPeer

Expand All @@ -34,11 +59,17 @@ type persistentPeer struct {
// connReqs holds all the active connection requests that we have for
// the peer.
connReqs []*connmgr.ConnReq

// backoff is the time to wait before trying to reconnect to a peer.
backoff time.Duration
}

// NewPersistentPeerManager creates a new PersistentPeerManager instance.
func NewPersistentPeerManager() *PersistentPeerManager {
func NewPersistentPeerManager(
cfg *PersistentPeerMgrConfig) *PersistentPeerManager {

return &PersistentPeerManager{
cfg: cfg,
conns: make(map[route.Vertex]*persistentPeer),
}
}
Expand All @@ -49,10 +80,18 @@ func (m *PersistentPeerManager) AddPeer(pubKey *btcec.PublicKey, perm bool) {
m.Lock()
defer m.Unlock()

m.conns[route.NewVertex(pubKey)] = &persistentPeer{
pubKey: pubKey,
perm: perm,
addrs: make(map[string]*lnwire.NetAddress),
peerKey := route.NewVertex(pubKey)

backoff := m.cfg.MinBackoff
if peer, ok := m.conns[peerKey]; ok {
backoff = peer.backoff
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the peer is already being tracked, does the fetch call above still add something? Perhaps setPeerAddrsUnsafe can just append new addrs?

}

m.conns[peerKey] = &persistentPeer{
pubKey: pubKey,
perm: perm,
addrs: make(map[string]*lnwire.NetAddress),
backoff: backoff,
}
}

Expand Down Expand Up @@ -236,3 +275,91 @@ func (m *PersistentPeerManager) NumPeerConnReqs(pubKey *btcec.PublicKey) int {

return len(peer.connReqs)
}

// NextPeerBackoff calculates, sets and returns the next backoff duration that
// should be used before attempting to reconnect to the peer.
func (m *PersistentPeerManager) NextPeerBackoff(pubKey *btcec.PublicKey,
startTime time.Time) time.Duration {

m.Lock()
defer m.Unlock()

peer, ok := m.conns[route.NewVertex(pubKey)]
if !ok {
return m.cfg.MinBackoff
}

peer.backoff = nextPeerBackoff(
peer.backoff, m.cfg.MinBackoff, m.cfg.MaxBackoff, startTime,
)

return peer.backoff
}

// nextPeerBackoff computes the next backoff duration for a peer using
// exponential backoff. If no previous backoff was known, the default is
// returned.
func nextPeerBackoff(currentBackoff, minBackoff, maxBackoff time.Duration,
startTime time.Time) time.Duration {

// If the peer failed to start properly, we'll just use the previous
// backoff to compute the subsequent randomized exponential backoff
// duration. This will roughly double on average.
if startTime.IsZero() {
return computeNextBackoff(currentBackoff, maxBackoff)
}

// The peer succeeded in starting. If the connection didn't last long
// enough to be considered stable, we'll continue to back off retries
// with this peer.
connDuration := time.Since(startTime)
if connDuration < defaultStableConnDuration {
return computeNextBackoff(currentBackoff, maxBackoff)
}

// The peer succeed in starting and this was stable peer, so we'll
// reduce the timeout duration by the length of the connection after
// applying randomized exponential backoff. We'll only apply this in the
// case that:
// reb(curBackoff) - connDuration > cfg.MinBackoff
relaxedBackoff := computeNextBackoff(currentBackoff, maxBackoff)
relaxedBackoff -= connDuration

if relaxedBackoff > maxBackoff {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be minBackoff

return relaxedBackoff
}

// Lastly, if reb(currBackoff) - connDuration <= cfg.MinBackoff, meaning
// the stable connection lasted much longer than our previous backoff.
// To reward such good behavior, we'll reconnect after the default
// timeout.
return minBackoff
}

// computeNextBackoff uses a truncated exponential backoff to compute the next
// backoff using the value of the exiting backoff. The returned duration is
// randomized in either direction by 1/20 to prevent tight loops from
// stabilizing.
func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration {
// Double the current backoff, truncating if it exceeds our maximum.
nextBackoff := 2 * currBackoff
if nextBackoff > maxBackoff {
nextBackoff = maxBackoff
}

// Using 1/10 of our duration as a margin, compute a random offset to
// avoid the nodes entering connection cycles.
margin := nextBackoff / 10

var wiggle big.Int
wiggle.SetUint64(uint64(margin))
if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
// Randomizing is not mission critical, so we'll just return the
// current backoff.
return nextBackoff
}

// Otherwise add in our wiggle, but subtract out half of the margin so
// that the backoff can tweaked by 1/20 in either direction.
return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
}
6 changes: 5 additions & 1 deletion peer/persistentpeer_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peer
import (
"net"
"testing"
"time"

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/connmgr"
Expand Down Expand Up @@ -32,7 +33,10 @@ func TestPersistentPeerManager(t *testing.T) {
_, alicePubKey := btcec.PrivKeyFromBytes(channels.AlicesPrivKey)
_, bobPubKey := btcec.PrivKeyFromBytes(channels.BobsPrivKey)

m := NewPersistentPeerManager()
m := NewPersistentPeerManager(&PersistentPeerMgrConfig{
MinBackoff: time.Millisecond * 10,
MaxBackoff: time.Millisecond * 100,
})

// Alice should not initially be a persistent peer.
require.False(t, m.IsPersistentPeer(alicePubKey))
Expand Down
103 changes: 8 additions & 95 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/rand"
"encoding/hex"
"fmt"
"math/big"
prand "math/rand"
"net"
"strconv"
Expand Down Expand Up @@ -77,12 +76,6 @@ const (
// connected to.
defaultMinPeers = 3

// defaultStableConnDuration is a floor under which all reconnection
// attempts will apply exponential randomized backoff. Connections
// durations exceeding this value will be eligible to have their
// backoffs reduced.
defaultStableConnDuration = 10 * time.Minute

// numInstantInitReconnect specifies how many persistent peers we should
// always attempt outbound connections to immediately. After this value
// is surpassed, the remaining peers will be randomly delayed using
Expand Down Expand Up @@ -196,7 +189,6 @@ type server struct {

persistentPeerMgr *peer.PersistentPeerManager

persistentPeersBackoff map[string]time.Duration
persistentRetryCancels map[string]chan struct{}

// peerErrors keeps a set of peer error buffers for peers that have
Expand Down Expand Up @@ -573,8 +565,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr,

torController: torController,

persistentPeerMgr: peer.NewPersistentPeerManager(),
persistentPeersBackoff: make(map[string]time.Duration),
persistentPeerMgr: peer.NewPersistentPeerManager(
&peer.PersistentPeerMgrConfig{
MinBackoff: cfg.MinBackoff,
MaxBackoff: cfg.MaxBackoff,
},
),
persistentRetryCancels: make(map[string]chan struct{}),
peerErrors: make(map[string]*queue.CircularBuffer),
ignorePeerTermination: make(map[*peer.Brontide]struct{}),
Expand Down Expand Up @@ -2957,16 +2953,13 @@ func (s *server) establishPersistentConnections() error {
// Iterate through the combined list of addresses from prior links and
// node announcements and attempt to reconnect to each node.
var numOutboundConns int
for pubStr, nodeAddr := range nodeAddrsMap {
for _, nodeAddr := range nodeAddrsMap {
// Add this peer to the set of peers we should maintain a
// persistent connection with. We set the value to false to
// indicate that we should not continue to reconnect if the
// number of channels returns to zero, since this peer has not
// been requested as perm by the user.
s.persistentPeerMgr.AddPeer(nodeAddr.pubKey, false)
if _, ok := s.persistentPeersBackoff[pubStr]; !ok {
s.persistentPeersBackoff[pubStr] = s.cfg.MinBackoff
}

for _, address := range nodeAddr.addresses {
// Create a wrapper address which couples the IP and
Expand Down Expand Up @@ -3030,9 +3023,7 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
return
}

pubKeyStr := string(compressedPubKey[:])
if s.persistentPeerMgr.IsNonPermPersistentPeer(pubKey) {
delete(s.persistentPeersBackoff, pubKeyStr)
s.cancelConnReqs(pubKey, nil)
s.persistentPeerMgr.DelPeer(pubKey)

Expand Down Expand Up @@ -3193,51 +3184,6 @@ func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) {
return peer, nil
}

// nextPeerBackoff computes the next backoff duration for a peer's pubkey using
// exponential backoff. If no previous backoff was known, the default is
// returned.
func (s *server) nextPeerBackoff(pubStr string,
startTime time.Time) time.Duration {

// Now, determine the appropriate backoff to use for the retry.
backoff, ok := s.persistentPeersBackoff[pubStr]
if !ok {
// If an existing backoff was unknown, use the default.
return s.cfg.MinBackoff
}

// If the peer failed to start properly, we'll just use the previous
// backoff to compute the subsequent randomized exponential backoff
// duration. This will roughly double on average.
if startTime.IsZero() {
return computeNextBackoff(backoff, s.cfg.MaxBackoff)
}

// The peer succeeded in starting. If the connection didn't last long
// enough to be considered stable, we'll continue to back off retries
// with this peer.
connDuration := time.Since(startTime)
if connDuration < defaultStableConnDuration {
return computeNextBackoff(backoff, s.cfg.MaxBackoff)
}

// The peer succeed in starting and this was stable peer, so we'll
// reduce the timeout duration by the length of the connection after
// applying randomized exponential backoff. We'll only apply this in the
// case that:
// reb(curBackoff) - connDuration > cfg.MinBackoff
relaxedBackoff := computeNextBackoff(backoff, s.cfg.MaxBackoff) - connDuration
if relaxedBackoff > s.cfg.MinBackoff {
return relaxedBackoff
}

// Lastly, if reb(currBackoff) - connDuration <= cfg.MinBackoff, meaning
// the stable connection lasted much longer than our previous backoff.
// To reward such good behavior, we'll reconnect after the default
// timeout.
return s.cfg.MinBackoff
}

// shouldDropConnection determines if our local connection to a remote peer
// should be dropped in the case of concurrent connection establishment. In
// order to deterministically decide which connection should be dropped, we'll
Expand Down Expand Up @@ -3897,8 +3843,7 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
}

// Record the computed backoff in the backoff map.
backoff := s.nextPeerBackoff(pubStr, p.StartTime())
s.persistentPeersBackoff[pubStr] = backoff
backoff := s.persistentPeerMgr.NextPeerBackoff(pubKey, p.StartTime())

// Initialize a retry canceller for this peer if one does not
// exist.
Expand Down Expand Up @@ -4127,9 +4072,6 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
// reconnecting even if the number of channels with this peer is
// zero.
s.persistentPeerMgr.AddPeer(addr.IdentityKey, true)
if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
s.persistentPeersBackoff[targetPub] = s.cfg.MinBackoff
}
s.persistentPeerMgr.AddPeerConnReq(addr.IdentityKey, connReq)
s.mu.Unlock()

Expand Down Expand Up @@ -4207,7 +4149,6 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
// them from this map so we don't attempt to re-connect after we
// disconnect.
s.persistentPeerMgr.DelPeer(pubKey)
delete(s.persistentPeersBackoff, pubStr)

// Remove the current peer from the server's internal state and signal
// that the peer termination watcher does not need to execute for this
Expand Down Expand Up @@ -4293,34 +4234,6 @@ func (s *server) Peers() []*peer.Brontide {
return peers
}

// computeNextBackoff uses a truncated exponential backoff to compute the next
// backoff using the value of the exiting backoff. The returned duration is
// randomized in either direction by 1/20 to prevent tight loops from
// stabilizing.
func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration {
// Double the current backoff, truncating if it exceeds our maximum.
nextBackoff := 2 * currBackoff
if nextBackoff > maxBackoff {
nextBackoff = maxBackoff
}

// Using 1/10 of our duration as a margin, compute a random offset to
// avoid the nodes entering connection cycles.
margin := nextBackoff / 10

var wiggle big.Int
wiggle.SetUint64(uint64(margin))
if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
// Randomizing is not mission critical, so we'll just return the
// current backoff.
return nextBackoff
}

// Otherwise add in our wiggle, but subtract out half of the margin so
// that the backoff can tweaked by 1/20 in either direction.
return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
}

// errNoAdvertisedAddr is an error returned when we attempt to retrieve the
// advertised address of a node, but they don't have one.
var errNoAdvertisedAddr = errors.New("no advertised address found")
Expand Down