Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi+refactor: persistent peer manager #5700

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
252c36a
peer+server: move persistentPeers to PersistentPeerManager
ellemouton May 11, 2022
8e772db
server+peer: move persistentPeerAddrs to PersistentPeerManager
ellemouton May 11, 2022
082bc98
peer+server: move persistentConnReqs to PersistentPeerManager
ellemouton May 11, 2022
1f6da30
peer+server: move persistentPeersBackoff to PersistentPeerManager
ellemouton May 23, 2022
417991c
peer+server: move persistentRetryCancels to PersistentPeerManager
ellemouton May 23, 2022
5221aaf
peer+server: move ConnectPeer logic to PersistentPeerManager
ellemouton May 23, 2022
a5f03dc
peer+server: move all connMgr Connect calls to PersistentPeerManager
ellemouton May 24, 2022
6756734
peer+server: move cancelConnReqs to PersistentPeerManager
ellemouton May 24, 2022
194de63
peer+server: deleting the peer should always cancel conn reqs
ellemouton May 24, 2022
73c105d
peer+server: move node addr update logic to PersistentPeerManager
ellemouton May 24, 2022
b7e26cf
peer+server: add addrs on persistent peer creation
ellemouton May 24, 2022
cdcdfb6
peer+server: only add supported addresses
ellemouton May 24, 2022
e226516
peer+server: allow PersistentPeerMgr to fetch advertised addrs
ellemouton May 24, 2022
a41637e
peer+server: let PersistentPeerManager do addr conversions
ellemouton May 24, 2022
71b397d
server: remove redundant address lookups
ellemouton May 24, 2022
c11367e
peer+server: Connect with backoff to PersistentPeerManager
ellemouton May 24, 2022
b7bd6f9
peer: always cancel any previous retry cancellers
ellemouton May 24, 2022
077d9ca
docs/release-notes: update with entry for 5700
ellemouton May 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 15 additions & 26 deletions peer/persistentpeer_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ type PersistentPeerMgrConfig struct {
// AddrTypeIsSupported returns true if we can connect to this type of
// address.
AddrTypeIsSupported func(addr net.Addr) bool

// FetchNodeAdvertisedAddrs can be used to fetch the advertised
// addresses of a node that we have persisted. This should only ge used
// to fetch an initial set of addresses for the peer.
FetchNodeAdvertisedAddrs func(route.Vertex) ([]net.Addr, error)
}

// PersistentPeerManager manages persistent peers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a basic description of what is/makes a peer persistent?

Expand Down Expand Up @@ -193,11 +198,18 @@ func (m *PersistentPeerManager) Stop() {
func (m *PersistentPeerManager) AddPeer(pubKey *btcec.PublicKey, perm bool,
addrs ...*lnwire.NetAddress) {

peerKey := route.NewVertex(pubKey)

// Fetch any stored addresses we may have for this peer.
advertisedAddrs, err := m.cfg.FetchNodeAdvertisedAddrs(peerKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to miss a graph update in between this fetch and adding the peer to the list of connections? Maybe safer to swap the order?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's safer to add it to the map and then update it with the fetched addrs

otherwise i think it's possible that:

  • we call ConnectPeer in the server
  • AddPeer is called and this ends up racing

if err != nil {
peerLog.Errorf("Unable to retrieve advertised address for "+
"node %s: %v", peerKey, err)
}

m.Lock()
defer m.Unlock()

peerKey := route.NewVertex(pubKey)

backoff := m.cfg.MinBackoff
if peer, ok := m.conns[peerKey]; ok {
backoff = peer.backoff
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the peer is already being tracked, does the fetch call above still add something? Perhaps setPeerAddrsUnsafe can just append new addrs?

Expand All @@ -209,7 +221,7 @@ func (m *PersistentPeerManager) AddPeer(pubKey *btcec.PublicKey, perm bool,
backoff: backoff,
}

m.setPeerAddrsUnsafe(peerKey, nil, addrs)
m.setPeerAddrsUnsafe(peerKey, advertisedAddrs, addrs)
}

// IsPersistentPeer returns true if the given peer is a peer that the
Expand Down Expand Up @@ -263,29 +275,6 @@ func (m *PersistentPeerManager) PersistentPeers() []*btcec.PublicKey {
return peers
}

// AddPeerAddresses is used to add addresses to a peers existing list of
// addresses.
func (m *PersistentPeerManager) AddPeerAddresses(pubKey *btcec.PublicKey,
addrs ...*lnwire.NetAddress) {

m.Lock()
defer m.Unlock()

peerKey := route.NewVertex(pubKey)

peer, ok := m.conns[peerKey]
if !ok {
return
}

peerAddrs := make([]*lnwire.NetAddress, 0, len(peer.addrs))
for _, addr := range peer.addrs {
peerAddrs = append(peerAddrs, addr)
}

m.setPeerAddrsUnsafe(peerKey, nil, append(peerAddrs, addrs...))
}

// NumPeerConnReqs returns the number of connection requests for the given peer.
func (m *PersistentPeerManager) NumPeerConnReqs(pubKey *btcec.PublicKey) int {
m.RLock()
Expand Down
86 changes: 32 additions & 54 deletions peer/persistentpeer_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func TestPersistentPeerManagerBasics(t *testing.T) {
AddrTypeIsSupported: func(addr net.Addr) bool {
return true
},
FetchNodeAdvertisedAddrs: func(vertex route.Vertex) ([]net.Addr,
error) {

return nil, nil
},
})
defer m.Stop()

Expand Down Expand Up @@ -82,31 +87,6 @@ func TestPersistentPeerManagerBasics(t *testing.T) {
require.Len(t, peers, 1)
require.True(t, peers[0].IsEqual(bobPubKey))

// Add an address for Bob.
m.AddPeerAddresses(bobPubKey, &lnwire.NetAddress{
IdentityKey: bobPubKey,
Address: testAddr1,
})

// Add another address for Bob.
m.AddPeerAddresses(bobPubKey, &lnwire.NetAddress{
IdentityKey: bobPubKey,
Address: testAddr2,
})

// Both addresses should appear in Bob's address list.
var addrs []*lnwire.NetAddress
for _, addr := range m.conns[route.NewVertex(bobPubKey)].addrs {
addrs = append(addrs, addr)
}
require.Len(t, addrs, 2)
if addrs[0].Address.String() == testAddr1.String() {
require.Equal(t, addrs[1].Address.String(), testAddr2.String())
} else {
require.Equal(t, addrs[0].Address.String(), testAddr2.String())
require.Equal(t, addrs[1].Address.String(), testAddr1.String())
}

// Delete Bob.
m.DelPeer(bobPubKey)
peers = m.PersistentPeers()
Expand All @@ -119,6 +99,11 @@ func TestRetryCanceller(t *testing.T) {
m := NewPersistentPeerManager(&PersistentPeerMgrConfig{
MinBackoff: time.Millisecond * 10,
MaxBackoff: time.Millisecond * 100,
FetchNodeAdvertisedAddrs: func(_ route.Vertex) ([]net.Addr,
error) {

return nil, nil
},
})
defer m.Stop()

Expand Down Expand Up @@ -196,6 +181,8 @@ func TestConnectionLogic(t *testing.T) {
cm := newMockConnMgr(t)
defer cm.stop()

_, alicePubKey := btcec.PrivKeyFromBytes(channels.AlicesPrivKey)

// Create a new PersistentPeerManager.
m := NewPersistentPeerManager(&PersistentPeerMgrConfig{
ConnMgr: cm,
Expand All @@ -206,37 +193,37 @@ func TestConnectionLogic(t *testing.T) {
AddrTypeIsSupported: func(addr net.Addr) bool {
return true
},
FetchNodeAdvertisedAddrs: func(vertex route.Vertex) ([]net.Addr,
error) {

switch vertex {
case route.NewVertex(alicePubKey):
return []net.Addr{testAddr1}, nil
default:
return nil, fmt.Errorf("unknown node")
}
},
})
require.NoError(t, m.Start())
defer m.Stop()

_, alicePubKey := btcec.PrivKeyFromBytes(channels.AlicesPrivKey)

// Add Alice as a persistent peer.
m.AddPeer(alicePubKey, false)

// There are currently no addresses stored for Alice, so calling
// ConnectPeer should not result in any connection requests.
// Since an address from FetchNodeAdvertisedAddrs would have been found
// for Alice, calling ConnectPeer should result in a connection request
// for that address.
m.ConnectPeer(alicePubKey)
require.Equal(t, cm.totalNumConnReqs(), 0)
assertOneConnReqPerAddress(t, cm, testAddr1)

// Now we add an address for Alice and attempt to connect again. This
// should result in 1 connection request for the given address.
m.AddPeerAddresses(alicePubKey, &lnwire.NetAddress{
IdentityKey: alicePubKey,
Address: testAddr1,
})
m.ConnectPeer(alicePubKey)
// Advertise the same address for Alice in a new NodeAnnouncement.
// There should still only be one connection request for this address.
addUpdate(alicePubKey, testAddr1)
assertOneConnReqPerAddress(t, cm, testAddr1)

// If we now add a second address for Alice, calling ConnectPeer again
// should result in one more connection request for the new address.
// The connection for the first address should remain intact.
m.AddPeerAddresses(alicePubKey, &lnwire.NetAddress{
IdentityKey: alicePubKey,
Address: testAddr2,
})
m.ConnectPeer(alicePubKey)
// Advertise new addresses for Alice, one being the same as what she
// had before and the other being a new one.
addUpdate(alicePubKey, testAddr1, testAddr2)
assertOneConnReqPerAddress(t, cm, testAddr1, testAddr2)

// If addresses come through from NodeAnnouncement updates, they should
Expand Down Expand Up @@ -296,15 +283,6 @@ func (m *mockConnMgr) stop() {
m.cm.Stop()
}

// totalNumConnReqs returns the number of connection requests that the
// mockConnMgr is keeping track of.
func (m *mockConnMgr) totalNumConnReqs() int {
m.Lock()
defer m.Unlock()

return len(m.reqs)
}

// numConnReqs returns the number of active connection requests to the given
// address.
func (m *mockConnMgr) numConnReqs(addr net.Addr) int {
Expand Down
90 changes: 10 additions & 80 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,16 @@ func newServer(cfg *Config, listenAddrs []net.Addr,

return false
},
FetchNodeAdvertisedAddrs: func(
peer route.Vertex) ([]net.Addr, error) {

node, err := s.graphDB.FetchLightningNode(peer)
if err != nil {
return nil, err
}

return node.Addresses, nil
},
},
)

Expand Down Expand Up @@ -3660,63 +3670,6 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
return
}

// Get the last address that we used to connect to the peer.
addrs := []net.Addr{
p.NetAddress().Address,
}

// We'll ensure that we locate all the peers advertised addresses for
// reconnection purposes.
advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(pubKey)
switch {
// We found advertised addresses, so use them.
case err == nil:
addrs = advertisedAddrs

// The peer doesn't have an advertised address.
case err == errNoAdvertisedAddr:
// If it is an outbound peer then we fall back to the existing
// peer address.
if !p.Inbound() {
break
}

// Fall back to the existing peer address if
// we're not accepting connections over Tor.
if s.torController == nil {
break
}

// If we are, the peer's address won't be known
Copy link
Collaborator

@Crypt-iQ Crypt-iQ Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this changes the behavior since now the peermgr will attempt to connect to the incoming address - before this change, the attempt wasn't made.

also, the original logic is slightly off. If we get an incoming connection and don't have any other addresses for the peer, we'll attempt to connect to the addr+port, but they may not actually be listening on the port given how tcp works. The issue seems to be that when we create a link node, we'll use the remote address as the address to store and expect to be able to connect to it

// to us (we'll see a private address, which is
// the address used by our onion service to dial
// to lnd), so we don't have enough information
// to attempt a reconnect.
srvrLog.Debugf("Ignoring reconnection attempt "+
"to inbound peer %v without "+
"advertised address", p)
return

// We came across an error retrieving an advertised
// address, log it, and fall back to the existing peer
// address.
default:
srvrLog.Errorf("Unable to retrieve advertised "+
"address for node %x: %v", p.PubKey(),
err)
}

// Add any missing addresses for this peer to persistentPeerAddr.
for _, addr := range addrs {
s.persistentPeerMgr.AddPeerAddresses(
p.IdentityKey(), &lnwire.NetAddress{
IdentityKey: p.IdentityKey(),
Address: addr,
ChainNet: p.NetAddress().ChainNet,
},
)
}

// Record the computed backoff in the backoff map.
backoff := s.persistentPeerMgr.NextPeerBackoff(pubKey, p.StartTime())

Expand Down Expand Up @@ -4000,29 +3953,6 @@ func (s *server) Peers() []*peer.Brontide {
return peers
}

// errNoAdvertisedAddr is an error returned when we attempt to retrieve the
// advertised address of a node, but they don't have one.
var errNoAdvertisedAddr = errors.New("no advertised address found")

// fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a node.
func (s *server) fetchNodeAdvertisedAddrs(pub *btcec.PublicKey) ([]net.Addr, error) {
vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed())
if err != nil {
return nil, err
}

node, err := s.graphDB.FetchLightningNode(vertex)
if err != nil {
return nil, err
}

if len(node.Addresses) == 0 {
return nil, errNoAdvertisedAddr
}

return node.Addresses, nil
}

// fetchLastChanUpdate returns a function which is able to retrieve our latest
// channel update for a target channel.
func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
Expand Down