-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
multi+refactor: persistent peer manager #5700
Changes from 1 commit
252c36a
8e772db
082bc98
1f6da30
417991c
5221aaf
a5f03dc
6756734
194de63
73c105d
b7e26cf
cdcdfb6
e226516
a41637e
71b397d
c11367e
b7bd6f9
077d9ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package peer | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/btcsuite/btcd/btcec/v2" | ||
"github.com/lightningnetwork/lnd/routing/route" | ||
) | ||
|
||
// PersistentPeerManager manages persistent peers. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a basic description of what is/makes a peer persistent? |
||
type PersistentPeerManager struct { | ||
// conns maps a peer's public key to a persistentPeer object. | ||
conns map[route.Vertex]*persistentPeer | ||
|
||
sync.RWMutex | ||
} | ||
|
||
// persistentPeer holds all the info about a peer that the | ||
// PersistentPeerManager needs. | ||
type persistentPeer struct { | ||
// pubKey is the public key identifier of the peer. | ||
pubKey *btcec.PublicKey | ||
|
||
// perm indicates if a connection to the peer should be maintained even | ||
// if there are no channels with the peer. | ||
perm bool | ||
} | ||
|
||
// NewPersistentPeerManager creates a new PersistentPeerManager instance. | ||
func NewPersistentPeerManager() *PersistentPeerManager { | ||
return &PersistentPeerManager{ | ||
conns: make(map[route.Vertex]*persistentPeer), | ||
} | ||
} | ||
|
||
// AddPeer adds a new persistent peer for the PersistentPeerManager to keep | ||
// track of. | ||
func (m *PersistentPeerManager) AddPeer(pubKey *btcec.PublicKey, perm bool) { | ||
m.Lock() | ||
defer m.Unlock() | ||
|
||
m.conns[route.NewVertex(pubKey)] = &persistentPeer{ | ||
pubKey: pubKey, | ||
perm: perm, | ||
} | ||
} | ||
|
||
// IsPersistentPeer returns true if the given peer is a peer that the | ||
// PersistentPeerManager manages. | ||
func (m *PersistentPeerManager) IsPersistentPeer(pubKey *btcec.PublicKey) bool { | ||
m.RLock() | ||
defer m.RUnlock() | ||
|
||
_, ok := m.conns[route.NewVertex(pubKey)] | ||
return ok | ||
} | ||
|
||
// IsNonPermPersistentPeer returns true if the peer is a persistent peer but | ||
// has been marked as non-permanent. | ||
func (m *PersistentPeerManager) IsNonPermPersistentPeer( | ||
pubKey *btcec.PublicKey) bool { | ||
|
||
m.RLock() | ||
defer m.RUnlock() | ||
|
||
peer, ok := m.conns[route.NewVertex(pubKey)] | ||
if !ok { | ||
return false | ||
} | ||
|
||
return !peer.perm | ||
} | ||
|
||
// DelPeer removes a peer from the list of persistent peers that the | ||
// PersistentPeerManager will manage. | ||
func (m *PersistentPeerManager) DelPeer(pubKey *btcec.PublicKey) { | ||
m.Lock() | ||
defer m.Unlock() | ||
|
||
delete(m.conns, route.NewVertex(pubKey)) | ||
} | ||
|
||
// PersistentPeers returns the list of public keys of the peers it is currently | ||
// keeping track of. | ||
func (m *PersistentPeerManager) PersistentPeers() []*btcec.PublicKey { | ||
m.RLock() | ||
defer m.RUnlock() | ||
|
||
peers := make([]*btcec.PublicKey, 0, len(m.conns)) | ||
for _, p := range m.conns { | ||
peers = append(peers, p.pubKey) | ||
} | ||
|
||
return peers | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package peer | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/btcsuite/btcd/btcec/v2" | ||
"github.com/lightningnetwork/lnd/lntest/channels" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
// TestPersistentPeerManager tests that the PersistentPeerManager correctly | ||
// manages the persistent peers. | ||
func TestPersistentPeerManager(t *testing.T) { | ||
_, alicePubKey := btcec.PrivKeyFromBytes(channels.AlicesPrivKey) | ||
_, bobPubKey := btcec.PrivKeyFromBytes(channels.BobsPrivKey) | ||
|
||
m := NewPersistentPeerManager() | ||
|
||
// Alice should not initially be a persistent peer. | ||
require.False(t, m.IsPersistentPeer(alicePubKey)) | ||
|
||
// Now add Alice as a non-permanent persistent peer. | ||
m.AddPeer(alicePubKey, false) | ||
require.True(t, m.IsPersistentPeer(alicePubKey)) | ||
require.True(t, m.IsNonPermPersistentPeer(alicePubKey)) | ||
|
||
// Bob should not yet be a persistent peer. | ||
require.False(t, m.IsPersistentPeer(bobPubKey)) | ||
|
||
// Now add Bob as a permanent persistent peer. | ||
m.AddPeer(bobPubKey, true) | ||
require.True(t, m.IsPersistentPeer(bobPubKey)) | ||
require.False(t, m.IsNonPermPersistentPeer(bobPubKey)) | ||
|
||
// Both Alice and Bob should be listed as persistent peers. | ||
peers := m.PersistentPeers() | ||
require.Len(t, peers, 2) | ||
|
||
if peers[0].IsEqual(alicePubKey) { | ||
require.True(t, peers[1].IsEqual(bobPubKey)) | ||
} else { | ||
require.True(t, peers[0].IsEqual(bobPubKey)) | ||
require.True(t, peers[1].IsEqual(alicePubKey)) | ||
} | ||
|
||
// Delete Alice. | ||
m.DelPeer(alicePubKey) | ||
require.False(t, m.IsPersistentPeer(alicePubKey)) | ||
|
||
peers = m.PersistentPeers() | ||
require.Len(t, peers, 1) | ||
require.True(t, peers[0].IsEqual(bobPubKey)) | ||
|
||
// Delete Bob. | ||
m.DelPeer(bobPubKey) | ||
peers = m.PersistentPeers() | ||
require.Len(t, peers, 0) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -194,11 +194,8 @@ type server struct { | |
peerConnectedListeners map[string][]chan<- lnpeer.Peer | ||
peerDisconnectedListeners map[string][]chan<- struct{} | ||
|
||
// TODO(yy): the Brontide.Start doesn't know this value, which means it | ||
// will continue to send messages even if there are no active channels | ||
// and the value below is false. Once it's pruned, all its connections | ||
// will be closed, thus the Brontide.Start will return an error. | ||
persistentPeers map[string]bool | ||
persistentPeerMgr *peer.PersistentPeerManager | ||
|
||
persistentPeersBackoff map[string]time.Duration | ||
persistentPeerAddrs map[string][]*lnwire.NetAddress | ||
persistentConnReqs map[string][]*connmgr.ConnReq | ||
|
@@ -359,7 +356,9 @@ func (s *server) updatePersistentPeerAddrs() error { | |
// We only care about updates from | ||
// our persistentPeers. | ||
s.mu.RLock() | ||
_, ok := s.persistentPeers[pubKeyStr] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's possible to remove the server mutex (un)locking calls, since those seem to be handled by the new manager. Though maybe it's best to not change any assumptions here and keep it how you did it |
||
ok := s.persistentPeerMgr.IsPersistentPeer( | ||
update.IdentityKey, | ||
) | ||
s.mu.RUnlock() | ||
if !ok { | ||
continue | ||
|
@@ -574,7 +573,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, | |
|
||
torController: torController, | ||
|
||
persistentPeers: make(map[string]bool), | ||
persistentPeerMgr: peer.NewPersistentPeerManager(), | ||
persistentPeersBackoff: make(map[string]time.Duration), | ||
persistentConnReqs: make(map[string][]*connmgr.ConnReq), | ||
persistentPeerAddrs: make(map[string][]*lnwire.NetAddress), | ||
|
@@ -1280,9 +1279,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, | |
// to connect to this peer even if the number of | ||
// channels with this peer is zero. | ||
s.mu.Lock() | ||
pubStr := string(peerKey.SerializeCompressed()) | ||
if _, ok := s.persistentPeers[pubStr]; !ok { | ||
s.persistentPeers[pubStr] = false | ||
if !s.persistentPeerMgr.IsPersistentPeer(peerKey) { | ||
s.persistentPeerMgr.AddPeer(peerKey, false) | ||
} | ||
s.mu.Unlock() | ||
|
||
|
@@ -2416,9 +2414,9 @@ func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} { | |
|
||
// Ignore all persistent peers as they have a dedicated reconnecting | ||
// process. | ||
for pubKeyStr := range s.persistentPeers { | ||
for _, pubKey := range s.persistentPeerMgr.PersistentPeers() { | ||
var nID autopilot.NodeID | ||
copy(nID[:], []byte(pubKeyStr)) | ||
copy(nID[:], pubKey.SerializeCompressed()) | ||
ignore[nID] = struct{}{} | ||
} | ||
|
||
|
@@ -2967,7 +2965,7 @@ func (s *server) establishPersistentConnections() error { | |
// indicate that we should not continue to reconnect if the | ||
// number of channels returns to zero, since this peer has not | ||
// been requested as perm by the user. | ||
s.persistentPeers[pubStr] = false | ||
s.persistentPeerMgr.AddPeer(nodeAddr.pubKey, false) | ||
if _, ok := s.persistentPeersBackoff[pubStr]; !ok { | ||
s.persistentPeersBackoff[pubStr] = s.cfg.MinBackoff | ||
} | ||
|
@@ -3023,22 +3021,28 @@ func (s *server) delayInitialReconnect(pubStr string) { | |
// persistent connections to a peer within the server. This is used to avoid | ||
// persistent connection retries to peers we do not have any open channels with. | ||
func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) { | ||
pubKeyStr := string(compressedPubKey[:]) | ||
|
||
s.mu.Lock() | ||
if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm { | ||
delete(s.persistentPeers, pubKeyStr) | ||
defer s.mu.Unlock() | ||
|
||
pubKey, err := btcec.ParsePubKey(compressedPubKey[:]) | ||
if err != nil { | ||
srvrLog.Errorf("could not convert pubKey bytes (%x) to "+ | ||
"btcec.PublicKey: %v", compressedPubKey, err) | ||
return | ||
} | ||
|
||
pubKeyStr := string(compressedPubKey[:]) | ||
if s.persistentPeerMgr.IsNonPermPersistentPeer(pubKey) { | ||
delete(s.persistentPeersBackoff, pubKeyStr) | ||
delete(s.persistentPeerAddrs, pubKeyStr) | ||
s.cancelConnReqs(pubKeyStr, nil) | ||
s.mu.Unlock() | ||
s.persistentPeerMgr.DelPeer(pubKey) | ||
|
||
srvrLog.Infof("Pruned peer %x from persistent connections, "+ | ||
"peer has no open channels", compressedPubKey) | ||
|
||
return | ||
} | ||
s.mu.Unlock() | ||
} | ||
|
||
// BroadcastMessage sends a request to the server to broadcast a set of | ||
|
@@ -3830,7 +3834,7 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) { | |
s.removePeer(p) | ||
|
||
// Next, check to see if this is a persistent peer or not. | ||
if _, ok := s.persistentPeers[pubStr]; !ok { | ||
if !s.persistentPeerMgr.IsPersistentPeer(pubKey) { | ||
return | ||
} | ||
|
||
|
@@ -4132,7 +4136,7 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, | |
// the entry to true which will tell the server to continue | ||
// reconnecting even if the number of channels with this peer is | ||
// zero. | ||
s.persistentPeers[targetPub] = true | ||
s.persistentPeerMgr.AddPeer(addr.IdentityKey, true) | ||
if _, ok := s.persistentPeersBackoff[targetPub]; !ok { | ||
s.persistentPeersBackoff[targetPub] = s.cfg.MinBackoff | ||
} | ||
|
@@ -4214,7 +4218,7 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { | |
// If this peer was formerly a persistent connection, then we'll remove | ||
// them from this map so we don't attempt to re-connect after we | ||
// disconnect. | ||
delete(s.persistentPeers, pubStr) | ||
s.persistentPeerMgr.DelPeer(pubKey) | ||
delete(s.persistentPeersBackoff, pubStr) | ||
|
||
// Remove the current peer from the server's internal state and signal | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think
route.Vertex
needs to be introduced