Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
PingAck MessageType = "ping_ack"
)

func NewMessage(msgType MessageType, subjectId string, sourceId, originId string, payload *MessagePayload) *Message {
func NewMessage(msgType MessageType, subjectId, sourceId, originId string, payload *MessagePayload) *Message {
return &Message{
Type: msgType,
SubjectId: subjectId,
Expand Down
2 changes: 1 addition & 1 deletion pkg/node/etcd_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func WatchPeers(node *Node, cli *clientv3.Client) {
normalizedPeers[id] = NormalizeHostPort(addr, "8080")
}
node.syncPeers(normalizedPeers)
slog.Info("[WatchPeers Callback] synced %d peers\n", len(peers))
slog.Info("[WatchPeers Callback] synced", "peers", len(peers))
})
slog.Info("[BOOT] after WatchPeers")
}
18 changes: 14 additions & 4 deletions pkg/node/gossip_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (n *Node) handlePingAck(msg *gossip.Message) {
}
payload := n.removeGossip()
message := gossip.NewMessage(
gossip.Ping,
gossip.PingAck,
msg.SubjectId,
n.id,
msg.OriginId,
Expand Down Expand Up @@ -203,10 +203,13 @@ func (n *Node) attemptConnectToCluster(addr string) {
func (n *Node) ConnectToCluster(addr string, attemptPeriod time.Duration) {
ticker := time.NewTicker(attemptPeriod)
for range ticker.C {
n.mu.Lock()
if len(n.peers) > 0 {
n.mu.Unlock()
break
}
n.attemptConnectToCluster(addr)
n.mu.Unlock()
}
}

Expand Down Expand Up @@ -238,8 +241,9 @@ func StartGossipListener(node *Node) {
if err := json.Unmarshal(data, &msg); err != nil {
continue
}

node.mu.Lock()
node.handleGossip(&msg, addr.String())
node.mu.Unlock()
}
}

Expand Down Expand Up @@ -285,6 +289,7 @@ func StartGossipPinger(node *Node, opts ...pingerOption) {

for range ticker.C {
// declare peer dead if has not been acked since last ping
node.mu.Lock()
if node.suspectPeer != "" {
peerBody, ok := node.peers[node.suspectPeer]
if ok {
Expand All @@ -303,6 +308,7 @@ func StartGossipPinger(node *Node, opts ...pingerOption) {
node.suspectPeer = node.getRandomPeer()
peerBody, ok := node.peers[node.suspectPeer]
if !ok {
node.mu.Unlock()
continue
}
message := gossip.NewMessage(
Expand All @@ -314,10 +320,13 @@ func StartGossipPinger(node *Node, opts ...pingerOption) {
)
node.sendGossip(message, peerBody.Addr)

suspect := node.suspectPeer
// send ping req to k random peers after timeout
node.timeout = time.AfterFunc(cfg.timeout, func() {
node.mu.Lock()
defer node.mu.Unlock()
for _, id := range node.getKRandomPeers(cfg.k) {
if id == node.suspectPeer {
if id == suspect {
continue
}
peerBody, ok := node.peers[id]
Expand All @@ -326,13 +335,14 @@ func StartGossipPinger(node *Node, opts ...pingerOption) {
}
message := gossip.NewMessage(
gossip.PingReq,
id,
suspect,
node.id,
node.id,
payload,
)
node.sendGossip(message, peerBody.Addr)
}
})
node.mu.Unlock()
}
}
2 changes: 2 additions & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log/slog"
"math"
"math/rand"
"sync"
"time"

"github.com/ryandielhenn/zephyrcache/pkg/gossip"
Expand All @@ -24,6 +25,7 @@ type Node struct {
incarnation int
timeout *time.Timer
gossipPort string
mu sync.Mutex
}

func NewNode(store *kv.Store, r *ring.HashRing, id string, addr string, gossipPort string) *Node {
Expand Down
2 changes: 2 additions & 0 deletions pkg/node/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func OverrideHostPort(addr, port string) string {

// ownerForKey looks up the owner for a key and normalizes the address of the owner
func (s *Node) OwnerForKey(key string) (ownerHP, selfHP string, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
ownerID := s.ring.Lookup([]byte(key)) // e.g. "Node3"
ownerAddr, ok := s.ring.Addr(ownerID) // e.g. "Node3:8080" (what you stored)
if !ok || ownerAddr == "" {
Expand Down
Loading