From 63b99f7fc657be306dcdf9620c5f4787afb93a6e Mon Sep 17 00:00:00 2001 From: Ryan Dielhenn Date: Sat, 21 Mar 2026 22:51:11 -0700 Subject: [PATCH 1/2] add mutex to node for concurrent reads/writes during gossip --- pkg/gossip/gossip.go | 2 +- pkg/node/etcd_handlers.go | 2 +- pkg/node/gossip_handlers.go | 18 ++++++++++++++---- pkg/node/node.go | 2 ++ pkg/node/utils.go | 2 ++ 5 files changed, 20 insertions(+), 6 deletions(-) diff --git a/pkg/gossip/gossip.go b/pkg/gossip/gossip.go index 60a977b..27f7c9b 100644 --- a/pkg/gossip/gossip.go +++ b/pkg/gossip/gossip.go @@ -25,7 +25,7 @@ const ( PingAck MessageType = "ping_ack" ) -func NewMessage(msgType MessageType, subjectId string, sourceId, originId string, payload *MessagePayload) *Message { +func NewMessage(msgType MessageType, subjectId, sourceId, originId string, payload *MessagePayload) *Message { return &Message{ Type: msgType, SubjectId: subjectId, diff --git a/pkg/node/etcd_handlers.go b/pkg/node/etcd_handlers.go index cc334e8..58df4c7 100644 --- a/pkg/node/etcd_handlers.go +++ b/pkg/node/etcd_handlers.go @@ -46,7 +46,7 @@ func WatchPeers(node *Node, cli *clientv3.Client) { normalizedPeers[id] = NormalizeHostPort(addr, "8080") } node.syncPeers(normalizedPeers) - slog.Info("[WatchPeers Callback] synced %d peers\n", len(peers)) + slog.Info("[WatchPeers Callback] synced", "peers", len(peers)) }) slog.Info("[BOOT] after WatchPeers") } diff --git a/pkg/node/gossip_handlers.go b/pkg/node/gossip_handlers.go index e5ad280..c47220f 100644 --- a/pkg/node/gossip_handlers.go +++ b/pkg/node/gossip_handlers.go @@ -76,7 +76,7 @@ func (n *Node) handlePingAck(msg *gossip.Message) { } payload := n.removeGossip() message := gossip.NewMessage( - gossip.Ping, + gossip.PingAck, msg.SubjectId, n.id, msg.OriginId, @@ -203,10 +203,13 @@ func (n *Node) attemptConnectToCluster(addr string) { func (n *Node) ConnectToCluster(addr string, attemptPeriod time.Duration) { ticker := time.NewTicker(attemptPeriod) for range ticker.C { + n.mu.RLock() if len(n.peers) > 0 { + n.mu.RUnlock() break } n.attemptConnectToCluster(addr) + n.mu.RUnlock() } } @@ -238,8 +241,9 @@ func StartGossipListener(node *Node) { if err := json.Unmarshal(data, &msg); err != nil { continue } - + node.mu.Lock() node.handleGossip(&msg, addr.String()) + node.mu.Unlock() } } @@ -285,6 +289,7 @@ func StartGossipPinger(node *Node, opts ...pingerOption) { for range ticker.C { // declare peer dead if has not been acked since last ping + node.mu.Lock() if node.suspectPeer != "" { peerBody, ok := node.peers[node.suspectPeer] if ok { @@ -303,6 +308,7 @@ func StartGossipPinger(node *Node, opts ...pingerOption) { node.suspectPeer = node.getRandomPeer() peerBody, ok := node.peers[node.suspectPeer] if !ok { + node.mu.Unlock() continue } message := gossip.NewMessage( @@ -314,10 +320,13 @@ func StartGossipPinger(node *Node, opts ...pingerOption) { ) node.sendGossip(message, peerBody.Addr) + suspect := node.suspectPeer // send ping req to k random peers after timeout node.timeout = time.AfterFunc(cfg.timeout, func() { + node.mu.RLock() + defer node.mu.RUnlock() for _, id := range node.getKRandomPeers(cfg.k) { - if id == node.suspectPeer { + if id == suspect { continue } peerBody, ok := node.peers[id] @@ -326,7 +335,7 @@ func StartGossipPinger(node *Node, opts ...pingerOption) { } message := gossip.NewMessage( gossip.PingReq, - id, + suspect, node.id, node.id, payload, @@ -334,5 +343,6 @@ func StartGossipPinger(node *Node, opts ...pingerOption) { node.sendGossip(message, peerBody.Addr) } }) + node.mu.Unlock() } } diff --git a/pkg/node/node.go b/pkg/node/node.go index 7470d85..7fdfd95 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -4,6 +4,7 @@ import ( "log/slog" "math" "math/rand" + "sync" "time" "github.com/ryandielhenn/zephyrcache/pkg/gossip" @@ -24,6 +25,7 @@ type Node struct { incarnation int timeout *time.Timer gossipPort string + mu sync.RWMutex } func NewNode(store *kv.Store, r *ring.HashRing, id string, addr string, gossipPort string) *Node { diff --git a/pkg/node/utils.go b/pkg/node/utils.go index 2bd3458..b78abdd 100644 --- a/pkg/node/utils.go +++ b/pkg/node/utils.go @@ -31,6 +31,8 @@ func OverrideHostPort(addr, port string) string { // ownerForKey looks up the owner for a key and normalizes the address of the owner func (s *Node) OwnerForKey(key string) (ownerHP, selfHP string, ok bool) { + s.mu.RLock() + defer s.mu.RUnlock() ownerID := s.ring.Lookup([]byte(key)) // e.g. "Node3" ownerAddr, ok := s.ring.Addr(ownerID) // e.g. "Node3:8080" (what you stored) if !ok || ownerAddr == "" { From 89100a8146e001f13bfe1a3b01a9bb568fec5108 Mon Sep 17 00:00:00 2001 From: Ryan Dielhenn Date: Sun, 22 Mar 2026 03:21:23 -0700 Subject: [PATCH 2/2] nits --- pkg/node/gossip_handlers.go | 10 +++++----- pkg/node/node.go | 2 +- pkg/node/utils.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/node/gossip_handlers.go b/pkg/node/gossip_handlers.go index c47220f..f9bb3d0 100644 --- a/pkg/node/gossip_handlers.go +++ b/pkg/node/gossip_handlers.go @@ -203,13 +203,13 @@ func (n *Node) attemptConnectToCluster(addr string) { func (n *Node) ConnectToCluster(addr string, attemptPeriod time.Duration) { ticker := time.NewTicker(attemptPeriod) for range ticker.C { - n.mu.RLock() + n.mu.Lock() if len(n.peers) > 0 { - n.mu.RUnlock() + n.mu.Unlock() break } n.attemptConnectToCluster(addr) - n.mu.RUnlock() + n.mu.Unlock() } } @@ -323,8 +323,8 @@ func StartGossipPinger(node *Node, opts ...pingerOption) { suspect := node.suspectPeer // send ping req to k random peers after timeout node.timeout = time.AfterFunc(cfg.timeout, func() { - node.mu.RLock() - defer node.mu.RUnlock() + node.mu.Lock() + defer node.mu.Unlock() for _, id := range node.getKRandomPeers(cfg.k) { if id == suspect { continue diff --git a/pkg/node/node.go b/pkg/node/node.go index 7fdfd95..013847d 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -25,7 +25,7 @@ type Node struct { incarnation int timeout *time.Timer gossipPort string - mu sync.RWMutex + mu sync.Mutex } func NewNode(store *kv.Store, r *ring.HashRing, id string, addr string, gossipPort string) *Node { diff --git a/pkg/node/utils.go b/pkg/node/utils.go index b78abdd..6c2e451 100644 --- a/pkg/node/utils.go +++ b/pkg/node/utils.go @@ -31,8 +31,8 @@ func OverrideHostPort(addr, port string) string { // ownerForKey looks up the owner for a key and normalizes the address of the owner func (s *Node) OwnerForKey(key string) (ownerHP, selfHP string, ok bool) { - s.mu.RLock() - defer s.mu.RUnlock() + s.mu.Lock() + defer s.mu.Unlock() ownerID := s.ring.Lookup([]byte(key)) // e.g. "Node3" ownerAddr, ok := s.ring.Addr(ownerID) // e.g. "Node3:8080" (what you stored) if !ok || ownerAddr == "" {