diff --git a/nomad/client_rpc.go b/nomad/client_rpc.go index 056cd549a96..cc8b594d695 100644 --- a/nomad/client_rpc.go +++ b/nomad/client_rpc.go @@ -156,28 +156,15 @@ func (s *Server) serverWithNodeConn(nodeID, region string) (*peers.Parts, error) } // Select the list of servers to check based on what region we are querying - s.peerLock.RLock() - - var rawTargets []*peers.Parts + var targets []*peers.Parts if region == s.Region() { - rawTargets = make([]*peers.Parts, 0, len(s.localPeers)) - for _, srv := range s.localPeers { - rawTargets = append(rawTargets, srv) - } + targets = s.peersCache.LocalPeers() } else { - peers, ok := s.peers[region] - if !ok { - s.peerLock.RUnlock() + targets = s.peersCache.RegionPeers(region) + if targets == nil { return nil, structs.ErrNoRegionPath } - rawTargets = peers - } - - targets := make([]*peers.Parts, 0, len(rawTargets)) - for _, target := range rawTargets { - targets = append(targets, target.Copy()) } - s.peerLock.RUnlock() // connections is used to store the servers that have connections to the // requested node. diff --git a/nomad/encrypter.go b/nomad/encrypter.go index 955f7074d91..10ee2f4dacb 100644 --- a/nomad/encrypter.go +++ b/nomad/encrypter.go @@ -33,7 +33,6 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/crypto" "github.com/hashicorp/nomad/helper/joseutil" - "github.com/hashicorp/nomad/nomad/peers" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/raft" @@ -1189,7 +1188,7 @@ func (krr *KeyringReplicator) replicateKey(ctx context.Context, wrappedKeys *str cfg := krr.srv.GetConfig() self := fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Region) - for _, peer := range krr.getAllPeers() { + for _, peer := range krr.srv.peersCache.LocalPeers() { if peer.Name == self { continue } @@ -1222,13 +1221,3 @@ func (krr *KeyringReplicator) replicateKey(ctx context.Context, wrappedKeys *str krr.logger.Debug("added key", "key", keyID) return nil } - -func (krr *KeyringReplicator) getAllPeers() []*peers.Parts { - krr.srv.peerLock.RLock() - defer krr.srv.peerLock.RUnlock() - peers := make([]*peers.Parts, 0, len(krr.srv.localPeers)) - for _, peer := range krr.srv.localPeers { - peers = append(peers, peer.Copy()) - } - return peers -} diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 3a3dd728d05..c816b0abee6 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -294,8 +294,6 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp return err } - n.srv.peerLock.RLock() - defer n.srv.peerLock.RUnlock() if err := n.constructNodeServerInfoResponse(args.Node.ID, snap, reply); err != nil { n.logger.Error("failed to populate NodeUpdateResponse", "error", err) return err @@ -467,14 +465,7 @@ func (n *Node) constructNodeServerInfoResponse(nodeID string, snap *state.StateS reply.LeaderRPCAddr = string(leaderAddr) // Reply with config information required for future RPC requests - reply.Servers = make([]*structs.NodeServerInfo, 0, len(n.srv.localPeers)) - for _, v := range n.srv.localPeers { - reply.Servers = append(reply.Servers, - &structs.NodeServerInfo{ - RPCAdvertiseAddr: v.RPCAddr.String(), - Datacenter: v.Datacenter, - }) - } + reply.Servers = n.srv.peersCache.LocalPeersServerInfo() ws := memdb.NewWatchSet() @@ -879,8 +870,6 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct // Set the reply index and leader reply.Index = index - n.srv.peerLock.RLock() - defer n.srv.peerLock.RUnlock() if err := n.constructNodeServerInfoResponse(node.GetID(), snap, reply); err != nil { n.logger.Error("failed to populate NodeUpdateResponse", "error", err) return err @@ -1158,8 +1147,6 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp // Set the reply index reply.Index = evalIndex - n.srv.peerLock.RLock() - defer n.srv.peerLock.RUnlock() if err := n.constructNodeServerInfoResponse(node.GetID(), snap, reply); err != nil { n.logger.Error("failed to populate NodeUpdateResponse", "error", err) return err diff --git a/nomad/operator_endpoint_test.go b/nomad/operator_endpoint_test.go index a5110856c45..a208579d54a 100644 --- a/nomad/operator_endpoint_test.go +++ b/nomad/operator_endpoint_test.go @@ -327,11 +327,8 @@ func (tc testcluster) anyFollowerRaftServerID() raft.ServerID { var tgtID raft.ServerID - s1.peerLock.Lock() - defer s1.peerLock.Unlock() - // Find the first non-leader server in the list. - for _, sp := range s1.localPeers { + for _, sp := range s1.peersCache.LocalPeers() { tgtID = raft.ServerID(sp.ID) if tgtID != ldrID { break @@ -346,12 +343,9 @@ func (tc testcluster) anyFollowerRaftServerAddress() raft.ServerAddress { var addr raft.ServerAddress - s1.peerLock.Lock() - defer s1.peerLock.Unlock() - // Find the first non-leader server in the list. - for a := range s1.localPeers { - addr = a + for _, a := range s1.peersCache.LocalPeers() { + addr = raft.ServerAddress(a.Addr.String()) if addr != lAddr { break } diff --git a/nomad/peers/peers.go b/nomad/peers/peers.go index 666fa93ab62..50e44857bc6 100644 --- a/nomad/peers/peers.go +++ b/nomad/peers/peers.go @@ -11,6 +11,8 @@ import ( "sync" "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" ) @@ -52,6 +54,9 @@ func (p *Parts) String() string { } func (p *Parts) Copy() *Parts { + if p == nil { + return nil + } ns := new(Parts) *ns = *p return ns @@ -133,47 +138,170 @@ func IsNomadServer(m serf.Member) (bool, *Parts) { return true, parts } -// PeerCache is a threadsafe cache of known Nomad server peers parsed from Serf -// members. It avoids the need to re-parse Serf members each time the peers -// need to be inspected. +// PartCache is a threadsafe cache of known Nomad server peers parsed from Serf +// members. It avoids the need to re-parse Serf members each time the peers need +// to be inspected and can be used for RPC routing and discovery and server +// version checking. +// +// Returned peer objects are copies of the cached objects. This ensures that the +// peer object is not mutated while being used by the caller. type PeerCache struct { - // peers is a map of region names to the list of known server peers in that - // region. All access must be protected by peersLock. - peers map[string][]*Parts + // allPeers is a map of region names to the list of known server peers in + // that region. Peers stored here can be in failed states and is used for + // server version checking only. + allPeers map[string][]*Parts + + // alivePeers is a map of region names to the list of known server peers in + // that region. Peers stored here are only those in the Alive state and is + // used for RPC routing and discovery. + alivePeers map[string][]*Parts + + // localPeers is a map of the known server peers in the local region keyed + // by their Raft address. Peers stored here are only those in the Alive + // state and is used for intra-region RPC routing and discovery. + localPeers map[raft.ServerAddress]*Parts + + // peersLock protects access to the maps above. We use a single lock so that + // updates are correctly applied to all maps together. peersLock sync.RWMutex } // NewPeerCache returns a new instance of a PeerCache ready for use. func NewPeerCache() *PeerCache { return &PeerCache{ - peers: make(map[string][]*Parts), + allPeers: make(map[string][]*Parts), + alivePeers: make(map[string][]*Parts), + localPeers: make(map[raft.ServerAddress]*Parts), + } +} + +func (p *PeerCache) LocalPeer(addr raft.ServerAddress) *Parts { + p.peersLock.RLock() + defer p.peersLock.RUnlock() + return p.localPeers[addr].Copy() +} + +// LocalPeers returns a list of known alive peers in the local region. +func (p *PeerCache) LocalPeers() []*Parts { + p.peersLock.RLock() + defer p.peersLock.RUnlock() + + peers := make([]*Parts, 0, len(p.localPeers)) + + for _, peer := range p.localPeers { + peers = append(peers, peer.Copy()) + } + + return peers +} + +func (p *PeerCache) LocalPeersServerInfo() []*structs.NodeServerInfo { + p.peersLock.RLock() + defer p.peersLock.RUnlock() + + peers := make([]*structs.NodeServerInfo, 0, len(p.localPeers)) + + for _, peer := range p.localPeers { + peers = append(peers, &structs.NodeServerInfo{ + RPCAdvertiseAddr: peer.RPCAddr.String(), + Datacenter: peer.Datacenter, + }) } + + return peers +} + +// RegionNum returns the number of known regions with at least one alive peer +// and are therfore suitable for RPC routing. +func (p *PeerCache) RegionNum() int { + p.peersLock.RLock() + defer p.peersLock.RUnlock() + return len(p.alivePeers) +} + +// RegionNames returns the names of all known regions with at least one alive +// peer and are therefore suitable for RPC routing. +func (p *PeerCache) RegionNames() []string { + p.peersLock.RLock() + defer p.peersLock.RUnlock() + + names := make([]string, 0, len(p.alivePeers)) + for name := range p.alivePeers { + names = append(names, name) + } + + return names +} + +// RegionPeers returns the list of known alive peers in the given region. If the +// region is not known or has no alive peers, an empty list is returned. +func (p *PeerCache) RegionPeers(region string) []*Parts { + p.peersLock.RLock() + defer p.peersLock.RUnlock() + + numPeers := len(p.alivePeers[region]) + if numPeers == 0 { + return nil + } + + peers := make([]*Parts, 0, numPeers) + + for _, peer := range p.alivePeers[region] { + peers = append(peers, peer.Copy()) + } + return peers } // PeerSet adds or updates the given parts in the cache. This should be called // when a new peer is detected or an existing peer changes is status. -func (p *PeerCache) PeerSet(parts *Parts) { +func (p *PeerCache) PeerSet(parts *Parts, localRegion string) { p.peersLock.Lock() defer p.peersLock.Unlock() - existing, ok := p.peers[parts.Region] - if !ok { - p.peers[parts.Region] = []*Parts{parts} - return + // Mirror the update in the all peers mapping which tracks all known peers + // regardless of status. + p.peerSetLocked(p.allPeers, parts) + + // Now update the alive peers and local peers mappings based on the status. + switch parts.Status { + case serf.StatusAlive: + p.peerSetLocked(p.alivePeers, parts) + p.peerSetLocalLocked(parts, localRegion) + default: + p.peerDeleteLocked(p.alivePeers, parts) + p.peerDeleteLocalLocked(parts, localRegion) } +} + +// peerSetLocalLocked adds or updates the given parts in the local peers map if +// it is in the local region. The caller must hold the peersLock. +func (p *PeerCache) peerSetLocalLocked(parts *Parts, localRegion string) { + if parts.Region == localRegion { + p.localPeers[raft.ServerAddress(parts.Addr.String())] = parts + } +} + +func (p *PeerCache) peerSetLocked(peers map[string][]*Parts, parts *Parts) { + + // Track if we found the peer already in the list. + var found bool + + existing := peers[parts.Region] // Replace if already present for i, ep := range existing { if ep.Name == parts.Name { existing[i] = parts - return + found = true + break } } - // If we reached this point then it's a new member, so append it to the - // exiting array. - p.peers[parts.Region] = append(existing, parts) + // Add ot the list if not known + if !found { + peers[parts.Region] = append(existing, parts) + } } // PeerDelete removes the given members from the cache. This should be called @@ -184,22 +312,34 @@ func (p *PeerCache) PeerDelete(event serf.MemberEvent) { for _, m := range event.Members { if ok, parts := IsNomadServer(m); ok { + p.peerDeleteLocked(p.allPeers, parts) + } + } +} - existing := p.peers[parts.Region] +// peerDeleteLocalLocked removes the given parts from the local peers map if it +// is in the local region. The caller must hold the peersLock. +func (p *PeerCache) peerDeleteLocalLocked(parts *Parts, localRegion string) { + if parts.Region == localRegion { + delete(p.localPeers, raft.ServerAddress(parts.Addr.String())) + } +} - existing = slices.DeleteFunc( - existing, - func(member *Parts) bool { return member.Name == parts.Name }, - ) +func (p *PeerCache) peerDeleteLocked(peers map[string][]*Parts, parts *Parts) { - // If all peers in the region are gone, remove the region entry - // entirely. Otherwise, update the list. - if len(existing) < 1 { - delete(p.peers, parts.Region) - } else { - p.peers[parts.Region] = existing - } - } + existing := peers[parts.Region] + + existing = slices.DeleteFunc( + existing, + func(member *Parts) bool { return member.Name == parts.Name }, + ) + + // If all peers in the region are gone, remove the region entry + // entirely. Otherwise, update the list. + if len(existing) < 1 { + delete(peers, parts.Region) + } else { + peers[parts.Region] = existing } } @@ -226,7 +366,7 @@ func (p *PeerCache) ServersMeetMinimumVersion( // the specific region. switch region { case AllRegions: - for _, peerList := range p.peers { + for _, peerList := range p.allPeers { if !regionServersMeetMinimumVersion(peerList, minVersion, checkFailedServers) { return false } @@ -237,7 +377,7 @@ func (p *PeerCache) ServersMeetMinimumVersion( // local region or all regions only. It's not possible that the server // is querying its own region but that region is not known. However, in // the future we may change this, so guard against it here just in case. - peerList, ok := p.peers[region] + peerList, ok := p.allPeers[region] if !ok { return false } diff --git a/nomad/peers/peers_test.go b/nomad/peers/peers_test.go index 717c03a99cf..dde40c05084 100644 --- a/nomad/peers/peers_test.go +++ b/nomad/peers/peers_test.go @@ -5,12 +5,16 @@ package peers import ( "fmt" + "math/rand/v2" "net" + "strconv" "testing" "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" "github.com/shoenig/test/must" ) @@ -109,11 +113,146 @@ func TestIsNomadServer(t *testing.T) { must.False(t, parts.NonVoter) } +func Test_NewPartsCache(t *testing.T) { + ci.Parallel(t) + + partsCache := NewPeerCache() + must.NotNil(t, partsCache) + must.MapLen(t, 0, partsCache.allPeers) + must.MapLen(t, 0, partsCache.alivePeers) + must.MapLen(t, 0, partsCache.localPeers) +} + +func TestPartsCache_LocalPeer(t *testing.T) { + ci.Parallel(t) + + peerCache := NewPeerCache() + must.Nil(t, peerCache.LocalPeer("127.0.0.1:4647")) + + euw1Peers := []*Parts{ + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + } + + for _, p := range euw1Peers { + peerCache.PeerSet(p, "euw1") + } + + must.Eq(t, euw1Peers[0], peerCache.LocalPeer(raft.ServerAddress(euw1Peers[0].Addr.String()))) + must.Eq(t, euw1Peers[1], peerCache.LocalPeer(raft.ServerAddress(euw1Peers[1].Addr.String()))) + must.Eq(t, euw1Peers[2], peerCache.LocalPeer(raft.ServerAddress(euw1Peers[2].Addr.String()))) +} + +func TestPartsCache_LocalPeersServerInfo(t *testing.T) { + ci.Parallel(t) + + peerCache := NewPeerCache() + must.Nil(t, peerCache.LocalPeer("127.0.0.1:4647")) + + euw1Peers := []*Parts{ + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + } + + for _, p := range euw1Peers { + peerCache.PeerSet(p, "euw1") + } + + expected := make([]*structs.NodeServerInfo, 0, len(euw1Peers)) + for _, p := range euw1Peers { + expected = append(expected, &structs.NodeServerInfo{ + RPCAdvertiseAddr: p.RPCAddr.String(), + Datacenter: p.Datacenter, + }) + } + + must.SliceContainsAll(t, expected, peerCache.LocalPeersServerInfo()) +} + +func TestPartsCache_LocalPeers(t *testing.T) { + ci.Parallel(t) + + peerCache := NewPeerCache() + must.SliceEmpty(t, peerCache.LocalPeers()) + + euw1Peers := []*Parts{ + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + } + + for _, p := range euw1Peers { + peerCache.PeerSet(p, "euw1") + } + + must.SliceLen(t, 3, peerCache.LocalPeers()) +} + +func TestPartsCache_RegionNum(t *testing.T) { + ci.Parallel(t) + + peerCache := NewPeerCache() + must.SliceEmpty(t, peerCache.LocalPeers()) + + peers := []*Parts{ + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw2", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw3", serf.StatusAlive), + } + + for _, p := range peers { + peerCache.PeerSet(p, "euw1") + } + + must.Eq(t, 3, peerCache.RegionNum()) +} + +func TestPartsCache_RegionPeers(t *testing.T) { + ci.Parallel(t) + + peerCache := NewPeerCache() + must.SliceEmpty(t, peerCache.LocalPeers()) + must.Nil(t, peerCache.RegionPeers("euw1")) + + euw1Peers := []*Parts{ + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + } + + for _, p := range euw1Peers { + peerCache.PeerSet(p, "euw1") + } + + must.SliceLen(t, 3, peerCache.RegionPeers("euw1")) +} + +func TestPartsCache_RegionNames(t *testing.T) { + ci.Parallel(t) + + peerCache := NewPeerCache() + must.SliceEmpty(t, peerCache.LocalPeers()) + + peers := []*Parts{ + generateTestParts(t, "1.2.3", "euw1", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw2", serf.StatusAlive), + generateTestParts(t, "1.2.3", "euw3", serf.StatusAlive), + } + + for _, p := range peers { + peerCache.PeerSet(p, "euw1") + } + + must.SliceContainsAll(t, []string{"euw1", "euw2", "euw3"}, peerCache.RegionNames()) +} + func TestPartsCache_PeerSet(t *testing.T) { ci.Parallel(t) peerCache := NewPeerCache() - must.MapLen(t, 0, peerCache.peers) + must.MapLen(t, 0, peerCache.allPeers) // Add an initial set of peers in the same region. euw1Peers := []*Parts{ @@ -123,11 +262,14 @@ func TestPartsCache_PeerSet(t *testing.T) { } for _, p := range euw1Peers { - peerCache.PeerSet(p) + peerCache.PeerSet(p, "euw1") } - must.MapLen(t, 1, peerCache.peers) - must.Len(t, 3, peerCache.peers["euw1"]) + must.MapLen(t, 1, peerCache.allPeers) + must.Len(t, 3, peerCache.allPeers["euw1"]) + must.MapLen(t, 1, peerCache.alivePeers) + must.Len(t, 3, peerCache.alivePeers["euw1"]) + must.MapLen(t, 3, peerCache.localPeers) // Add a second set of peers in a different region. euw2Peers := []*Parts{ @@ -137,29 +279,43 @@ func TestPartsCache_PeerSet(t *testing.T) { } for _, p := range euw2Peers { - peerCache.PeerSet(p) + peerCache.PeerSet(p, "euw1") } - must.MapLen(t, 2, peerCache.peers) - must.Len(t, 3, peerCache.peers["euw1"]) - must.Len(t, 3, peerCache.peers["euw2"]) + must.MapLen(t, 2, peerCache.allPeers) + must.Len(t, 3, peerCache.allPeers["euw1"]) + must.Len(t, 3, peerCache.allPeers["euw2"]) + must.MapLen(t, 3, peerCache.localPeers) // Update a peer's status and ensure it's updated in the cache. changedPeer := euw2Peers[1].Copy() changedPeer.Status = serf.StatusFailed - peerCache.PeerSet(changedPeer) - must.MapLen(t, 2, peerCache.peers) - must.Len(t, 3, peerCache.peers["euw1"]) - must.Len(t, 3, peerCache.peers["euw2"]) - must.Eq(t, serf.StatusFailed, peerCache.peers["euw2"][1].Status) + peerCache.PeerSet(changedPeer, "euw1") + must.MapLen(t, 2, peerCache.allPeers) + must.Len(t, 3, peerCache.allPeers["euw1"]) + must.Len(t, 3, peerCache.allPeers["euw2"]) + must.Len(t, 2, peerCache.alivePeers["euw2"]) + must.Eq(t, serf.StatusFailed, peerCache.allPeers["euw2"][1].Status) + + // Update a peer's status and ensure it's updated in the cache. + changedPeerEuw1 := euw1Peers[2].Copy() + changedPeerEuw1.Status = serf.StatusFailed + + peerCache.PeerSet(changedPeerEuw1, "euw1") + must.MapLen(t, 2, peerCache.allPeers) + must.Len(t, 3, peerCache.allPeers["euw1"]) + must.Len(t, 3, peerCache.allPeers["euw2"]) + must.Len(t, 2, peerCache.alivePeers["euw1"]) + must.Eq(t, serf.StatusFailed, peerCache.allPeers["euw1"][2].Status) + must.MapLen(t, 2, peerCache.localPeers) } func TestPartsCache_PeerDelete(t *testing.T) { ci.Parallel(t) peerCache := NewPeerCache() - must.MapLen(t, 0, peerCache.peers) + must.MapLen(t, 0, peerCache.allPeers) // Add an initial set of peers in the same region. partsList := []*Parts{ @@ -169,11 +325,11 @@ func TestPartsCache_PeerDelete(t *testing.T) { } for _, p := range partsList { - peerCache.PeerSet(p) + peerCache.PeerSet(p, "euw1") } - must.MapLen(t, 1, peerCache.peers) - must.Len(t, 3, peerCache.peers["euw1"]) + must.MapLen(t, 1, peerCache.allPeers) + must.Len(t, 3, peerCache.allPeers["euw1"]) // Create a serf.MemberEvent to delete the second peer. event := serf.MemberEvent{ @@ -194,10 +350,10 @@ func TestPartsCache_PeerDelete(t *testing.T) { } peerCache.PeerDelete(event) - must.MapLen(t, 1, peerCache.peers) - must.Len(t, 2, peerCache.peers["euw1"]) + must.MapLen(t, 1, peerCache.allPeers) + must.Len(t, 2, peerCache.allPeers["euw1"]) - for _, p := range peerCache.peers["euw1"] { + for _, p := range peerCache.allPeers["euw1"] { must.NotEq(t, partsList[1].Name, p.Name) } @@ -232,7 +388,7 @@ func TestPartsCache_PeerDelete(t *testing.T) { } peerCache.PeerDelete(event) - must.MapLen(t, 0, peerCache.peers) + must.MapLen(t, 0, peerCache.allPeers) } func TestPartsCache_ServersMeetMinimumVersion(t *testing.T) { @@ -439,7 +595,7 @@ func TestPartsCache_ServersMeetMinimumVersion(t *testing.T) { peerCache := NewPeerCache() for _, p := range tc.inputParts { - peerCache.PeerSet(p) + peerCache.PeerSet(p, "euw1") } result := peerCache.ServersMeetMinimumVersion( @@ -462,7 +618,7 @@ func generateTestParts(t *testing.T, version, region string, status serf.MemberS "role": "nomad", "region": region, "dc": "east-aws", - "port": "10000", + "port": strconv.Itoa(rand.IntN(10000)), "build": version, "vsn": "1", }, diff --git a/nomad/rpc.go b/nomad/rpc.go index d2e7a359f13..1f83886a123 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -676,13 +676,9 @@ func (s *Server) getLeader() (bool, *peers.Parts) { return false, nil } - // Lookup the server - s.peerLock.RLock() - server := s.localPeers[leader] - s.peerLock.RUnlock() - - // Server could be nil - return false, server + // Lookup the server and return it. We do not check the cache response, so + // the server could be nil. + return false, s.peersCache.LocalPeer(leader) } // forwardLeader is used to forward an RPC call to the leader, or fail if no leader @@ -704,10 +700,8 @@ func (r *rpcHandler) forwardServer(server *peers.Parts, method string, args inte } func (r *rpcHandler) findRegionServer(region string) (*peers.Parts, error) { - r.srv.peerLock.RLock() - defer r.srv.peerLock.RUnlock() - servers := r.srv.peers[region] + servers := r.srv.peersCache.RegionPeers(region) if len(servers) == 0 { r.logger.Warn("no path found to region", "region", region) return nil, structs.ErrNoRegionPath @@ -731,11 +725,8 @@ func (r *rpcHandler) forwardRegion(region, method string, args interface{}, repl } func (r *rpcHandler) getServer(region, serverID string) (*peers.Parts, error) { - // Bail if we can't find any servers - r.srv.peerLock.RLock() - defer r.srv.peerLock.RUnlock() - servers := r.srv.peers[region] + servers := r.srv.peersCache.RegionPeers(region) if len(servers) == 0 { r.logger.Warn("no path found to region", "region", region) return nil, structs.ErrNoRegionPath diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index 00a0acd71c9..ee9e388f4de 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -338,12 +338,10 @@ func TestRPC_streamingRpcConn_badMethod(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) - s1.peerLock.RLock() ok, parts := peers.IsNomadServer(s2.LocalMember()) require.True(ok) - server := s1.localPeers[raft.ServerAddress(parts.Addr.String())] + server := s1.peersCache.LocalPeer(raft.ServerAddress(parts.Addr.String())) require.NotNil(server) - s1.peerLock.RUnlock() conn, err := s1.streamingRpc(server, "Bogus") require.Nil(conn) @@ -397,12 +395,10 @@ func TestRPC_streamingRpcConn_badMethod_TLS(t *testing.T) { TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) - s1.peerLock.RLock() ok, parts := peers.IsNomadServer(s2.LocalMember()) require.True(ok) - server := s1.localPeers[raft.ServerAddress(parts.Addr.String())] + server := s1.peersCache.LocalPeer(raft.ServerAddress(parts.Addr.String())) require.NotNil(server) - s1.peerLock.RUnlock() conn, err := s1.streamingRpc(server, "Bogus") require.Nil(conn) @@ -434,12 +430,10 @@ func TestRPC_streamingRpcConn_goodMethod_Plaintext(t *testing.T) { TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) - s1.peerLock.RLock() ok, parts := peers.IsNomadServer(s2.LocalMember()) require.True(ok) - server := s1.localPeers[raft.ServerAddress(parts.Addr.String())] + server := s1.peersCache.LocalPeer(raft.ServerAddress(parts.Addr.String())) require.NotNil(server) - s1.peerLock.RUnlock() conn, err := s1.streamingRpc(server, "FileSystem.Logs") require.NotNil(conn) @@ -507,12 +501,10 @@ func TestRPC_streamingRpcConn_goodMethod_TLS(t *testing.T) { TestJoin(t, s1, s2) testutil.WaitForLeader(t, s1.RPC) - s1.peerLock.RLock() ok, parts := peers.IsNomadServer(s2.LocalMember()) require.True(ok) - server := s1.localPeers[raft.ServerAddress(parts.Addr.String())] + server := s1.peersCache.LocalPeer(raft.ServerAddress(parts.Addr.String())) require.NotNil(server) - s1.peerLock.RUnlock() conn, err := s1.streamingRpc(server, "FileSystem.Logs") require.NotNil(conn) diff --git a/nomad/serf.go b/nomad/serf.go index 2efbfb93f8e..e71c4a2c762 100644 --- a/nomad/serf.go +++ b/nomad/serf.go @@ -65,30 +65,7 @@ func (s *Server) nodeJoin(me serf.MemberEvent) { // A peer is joining, so we should update the cache to reflect its // status. - s.peersCache.PeerSet(parts) - - // Check if this server is known - found := false - s.peerLock.Lock() - existing := s.peers[parts.Region] - for idx, e := range existing { - if e.Name == parts.Name { - existing[idx] = parts - found = true - break - } - } - - // Add ot the list if not known - if !found { - s.peers[parts.Region] = append(existing, parts) - } - - // Check if a local peer - if parts.Region == s.config.Region { - s.localPeers[raft.ServerAddress(parts.Addr.String())] = parts - } - s.peerLock.Unlock() + s.peersCache.PeerSet(parts, s.Region()) // If we still expecting to bootstrap, may need to handle this if s.config.BootstrapExpect != 0 && !s.bootstrapped.Load() { @@ -253,33 +230,7 @@ func (s *Server) nodeFailed(me serf.MemberEvent) { // The peer is failed, so we should update the cache to reflect its // status. - s.peersCache.PeerSet(parts) - - // Remove the server if known - s.peerLock.Lock() - existing := s.peers[parts.Region] - n := len(existing) - for i := 0; i < n; i++ { - if existing[i].Name == parts.Name { - existing[i], existing[n-1] = existing[n-1], nil - existing = existing[:n-1] - n-- - break - } - } - - // Trim the list there are no known servers in a region - if n == 0 { - delete(s.peers, parts.Region) - } else { - s.peers[parts.Region] = existing - } - - // Check if local peer - if parts.Region == s.config.Region { - delete(s.localPeers, raft.ServerAddress(parts.Addr.String())) - } - s.peerLock.Unlock() + s.peersCache.PeerSet(parts, s.Region()) } } diff --git a/nomad/serf_test.go b/nomad/serf_test.go index b2cbc51c965..686ed7345f3 100644 --- a/nomad/serf_test.go +++ b/nomad/serf_test.go @@ -41,26 +41,22 @@ func TestNomad_JoinPeer(t *testing.T) { }) testutil.WaitForResult(func() (bool, error) { - s1.peerLock.Lock() - n1 := len(s1.peers) - local1 := len(s1.localPeers) - s1.peerLock.Unlock() + n1 := s1.peersCache.RegionNum() + local1 := len(s1.peersCache.LocalPeers()) if n1 != 2 { - return false, fmt.Errorf("bad: %#v", n1) + return false, fmt.Errorf("bad1: %#v", n1) } if local1 != 1 { - return false, fmt.Errorf("bad: %#v", local1) + return false, fmt.Errorf("bad2: %#v", local1) } - s2.peerLock.Lock() - n2 := len(s2.peers) - local2 := len(s2.localPeers) - s2.peerLock.Unlock() + n2 := s2.peersCache.RegionNum() + local2 := len(s2.peersCache.LocalPeers()) if n2 != 2 { - return false, fmt.Errorf("bad: %#v", n2) + return false, fmt.Errorf("bad3: %#v", n2) } if local2 != 1 { - return false, fmt.Errorf("bad: %#v", local2) + return false, fmt.Errorf("bad4: %#v", local2) } return true, nil }, func(err error) { @@ -96,11 +92,11 @@ func TestNomad_RemovePeer(t *testing.T) { s2.Shutdown() testutil.WaitForResult(func() (bool, error) { - if len(s1.peers) != 1 { - return false, fmt.Errorf("bad: %#v", s1.peers) + if s1Peers := s1.peersCache.RegionPeers(s1.Region()); len(s1Peers) != 1 { + return false, fmt.Errorf("bad: %#v", s1Peers) } - if len(s2.peers) != 1 { - return false, fmt.Errorf("bad: %#v", s2.peers) + if s2Peers := s2.peersCache.RegionPeers(s2.Region()); len(s2Peers) != 1 { + return false, fmt.Errorf("bad: %#v", s2Peers) } return true, nil }, func(err error) { @@ -176,9 +172,7 @@ func TestNomad_ReapPeer(t *testing.T) { s3.reconcileCh <- s2mem testutil.WaitForResult(func() (bool, error) { - s1.peerLock.Lock() - n1 := len(s1.peers["global"]) - s1.peerLock.Unlock() + n1 := len(s1.peersCache.RegionPeers("global")) if n1 != 2 { return false, fmt.Errorf("bad: %#v", n1) } @@ -190,9 +184,7 @@ func TestNomad_ReapPeer(t *testing.T) { return false, fmt.Errorf("bad: %#v", peers) } - s3.peerLock.Lock() - n3 := len(s3.peers["global"]) - s3.peerLock.Unlock() + n3 := len(s3.peersCache.RegionPeers("global")) if n3 != 2 { return false, fmt.Errorf("bad: %#v", n3) } @@ -364,8 +356,11 @@ func TestNomad_BootstrapExpect_NonVoter(t *testing.T) { if peers != expect { return false, fmt.Errorf("expected %d peers, got %d", expect, peers) } - if len(s.localPeers) != expect { - return false, fmt.Errorf("expected %d local peers, got %d: %#v", expect, len(s.localPeers), s.localPeers) + if localPeers := s.peersCache.LocalPeers(); len(localPeers) != expect { + return false, fmt.Errorf( + "expected %d local peers, got %d: %#v", + expect, len(localPeers), localPeers, + ) } } @@ -431,9 +426,7 @@ func TestNomad_NonBootstraping_ShouldntBootstap(t *testing.T) { defer cleanupS1() testutil.WaitForResult(func() (bool, error) { - s1.peerLock.Lock() - p := len(s1.localPeers) - s1.peerLock.Unlock() + p := len(s1.peersCache.LocalPeers()) if p != 1 { return false, fmt.Errorf("%d", p) } diff --git a/nomad/server.go b/nomad/server.go index 912bc2c1b22..272d348a834 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -172,19 +172,15 @@ type Server struct { nodeConns map[string][]*nodeConnState nodeConnsLock sync.RWMutex - // peers is used to track the known Nomad servers. This is - // used for region forwarding and clustering. - peers map[string][]*peers.Parts - localPeers map[raft.ServerAddress]*peers.Parts - peerLock sync.RWMutex - // serf is the Serf cluster containing only Nomad // servers. This is used for multi-region federation // and automatic clustering within regions. serf *serf.Serf - // peersCache is used to cache the parsed Nomad server member peer parts. - // This is used to avoid re-parsing the Serf tags on every access. + // peersPartsCache is used to cache the parsed Nomad server member peer + // parts. This is used to avoid re-parsing the Serf tags on every access and + // is used for RPC connection management, discovery, and server version + // checking. peersCache *peers.PeerCache // bootstrapped indicates if Server has bootstrapped or not. @@ -357,8 +353,6 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc rpcServer: rpc.NewServer(), streamingRpcs: structs.NewStreamingRpcRegistry(), nodeConns: make(map[string][]*nodeConnState), - peers: make(map[string][]*peers.Parts), - localPeers: make(map[raft.ServerAddress]*peers.Parts), peersCache: peers.NewPeerCache(), bootstrapped: &atomic.Bool{}, reassertLeaderCh: make(chan chan error), @@ -1984,13 +1978,7 @@ func (s *Server) isReadyForConsistentReads() bool { // Regions returns the known regions in the cluster. func (s *Server) Regions() []string { - s.peerLock.RLock() - defer s.peerLock.RUnlock() - - regions := make([]string, 0, len(s.peers)) - for region := range s.peers { - regions = append(regions, region) - } + regions := s.peersCache.RegionNames() sort.Strings(regions) return regions } @@ -2026,7 +2014,7 @@ func (s *Server) Stats() map[string]map[string]string { "leader": fmt.Sprintf("%v", s.IsLeader()), "leader_addr": string(leader), "bootstrap": fmt.Sprintf("%v", s.isSingleServerCluster()), - "known_regions": toString(uint64(len(s.peers))), + "known_regions": toString(uint64(s.peersCache.RegionNum())), }, "raft": s.raft.Stats(), "serf": s.serf.Stats(),