Skip to content
3 changes: 2 additions & 1 deletion cmd/bench/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func startNode(id, seedGossipAddr string) cacheNode {
go node.StartGossipListener(n)
go node.StartGossipPinger(n,
node.WithPeriod(50*time.Millisecond),
node.WithTimeout(50*time.Millisecond),
node.WithPingTimeout(50*time.Millisecond),
node.WithSuspectedTimeout(150*time.Millisecond),
)
if seedGossipAddr != "" {
go n.ConnectToCluster(seedGossipAddr, 50*time.Millisecond)
Expand Down
3 changes: 2 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func main() {
go node.StartGossipPinger(
n,
node.WithPeriod(200*time.Millisecond),
node.WithTimeout(100*time.Millisecond),
node.WithPingTimeout(100*time.Millisecond),
node.WithSuspectedTimeout(600*time.Millisecond),
)
default:
slog.Info("DISCOVERY must be set.")
Expand Down
7 changes: 7 additions & 0 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,10 @@ func NewPayload(peers map[string]peer.Peer, retransmit bool) *MessagePayload {
TransmitCount: transmitCount,
}
}

func NewPayloadWithTransmitCount(peers map[string]peer.Peer, transmitCount int) *MessagePayload {
return &MessagePayload{
Peers: peers,
TransmitCount: transmitCount,
}
}
244 changes: 167 additions & 77 deletions pkg/node/gossip_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,28 @@ func (n *Node) handleGossip(msg *gossip.Message, addr string) {

slog.Debug("Received Message", "message", *msg)

n.mu.Lock()
defer n.mu.Unlock()

if msg.Payload != nil {
n.handlePayload(msg.Payload, msg.SourceId)
}

switch msg.Type {
case gossip.Ping:
n.handlePing(msg, addr)
n.handlePing(msg)
case gossip.PingReq:
n.handlePingReq(msg)
case gossip.PingAck:
n.handlePingAck(msg)
}
}

func (n *Node) handlePing(msg *gossip.Message, addr string) {
func (n *Node) handlePing(msg *gossip.Message) {
peerBody, ok := n.peers[msg.SourceId]
if !ok {
return
}
payload := n.removeGossip()
message := gossip.NewMessage(
gossip.PingAck,
Expand All @@ -40,7 +47,7 @@ func (n *Node) handlePing(msg *gossip.Message, addr string) {
msg.OriginId,
payload,
)
n.sendGossip(message, addr)
n.sendGossip(message, peerBody.Addr)
}

func (n *Node) handlePingReq(msg *gossip.Message) {
Expand All @@ -60,23 +67,26 @@ func (n *Node) handlePingReq(msg *gossip.Message) {
}

func (n *Node) handlePingAck(msg *gossip.Message) {
// when a ping ack for suspected node is received
// we stop the ping req timeout and reset the suspected peer
if msg.OriginId == n.id && n.suspectPeer == msg.SubjectId {
if n.timeout != nil {
n.timeout.Stop()
n.timeout = nil
// handle ack at node that requested it
if msg.OriginId == n.id {
if n.targetPeer == msg.SubjectId {
if n.timeout != nil {
n.timeout.Stop()
n.timeout = nil
}
n.targetPeer = ""
}
n.suspectPeer = ""
return
}

// handle forwarding ack when ping req
peerBody, ok := n.peers[msg.OriginId]
if !ok {
return
}
payload := n.removeGossip()
message := gossip.NewMessage(
gossip.Ping,
gossip.PingAck,
msg.SubjectId,
n.id,
msg.OriginId,
Expand All @@ -96,6 +106,8 @@ func (n *Node) handlePayload(msg *gossip.MessagePayload, sourceId string) {
switch updatedPeer.Status {
case peer.Alive:
n.handleAliveStatus(id, updatedPeer, sourceId)
case peer.Suspected:
n.handleSuspectedStatus(id, updatedPeer)
case peer.Dead:
n.handleDeadStatus(id, updatedPeer)
}
Expand Down Expand Up @@ -125,7 +137,7 @@ func (n *Node) handleAliveStatus(id string, updatedPeer peer.Peer, sourceId stri

// determine whether message is stale or not
// update peer status if not stale and propagate update to other nodes
shouldUpdate := !ok || (updatedPeer.Incarnation > currentPeer.Incarnation)
shouldUpdate := !ok || updatedPeer.Supersedes(currentPeer)
if shouldUpdate {
n.setPeer(id, updatedPeer)
peers := map[string]peer.Peer{
Expand All @@ -136,18 +148,49 @@ func (n *Node) handleAliveStatus(id string, updatedPeer peer.Peer, sourceId stri
}
}

func (n *Node) handleDeadStatus(id string, updatedPeer peer.Peer) {
func (n *Node) handleSuspectedStatus(id string, updatedPeer peer.Peer) {
// drop payloads about yourself
if id == n.id {
// refute updates saying you are suspected
if updatedPeer.Incarnation == n.incarnation {
n.incarnation += 1
peers := map[string]peer.Peer{
n.id: {
Addr: n.addr,
Status: peer.Alive,
Incarnation: n.incarnation,
},
}
payload := gossip.NewPayload(peers, true)
n.addGossip(payload)
Copy link
Copy Markdown
Owner

@ryandielhenn ryandielhenn Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we just send the refutation immediately? I think refuting immediately resolves the uncertainty about the comment on line 186 of this file.

}
return
}

// determine whether message is stale or not
// update peer status if not stale and propagate update to other nodes
currentPeer, ok := n.peers[id]
shouldUpdate := !ok || updatedPeer.Supersedes(currentPeer)
if shouldUpdate {
n.setPeer(id, updatedPeer)
peers := map[string]peer.Peer{
id: updatedPeer,
}
payload := gossip.NewPayload(peers, true)
n.addGossip(payload)
}
}

func (n *Node) handleDeadStatus(id string, updatedPeer peer.Peer) {
// drop payloads about yourself
if id == n.id {
Copy link
Copy Markdown
Owner

@ryandielhenn ryandielhenn Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder for us: This is a potential bug that's being covered up, we should look into other parts of the codebase.

return
}

// determine whether message is stale or not
// update peer status if not stale and propagate update to other nodes
// dead status has precedence over alive messages for equal incarnation
shouldUpdate := !ok || (updatedPeer.Incarnation > currentPeer.Incarnation ||
updatedPeer.Incarnation == currentPeer.Incarnation && currentPeer.Status == peer.Alive)
currentPeer, ok := n.peers[id]
shouldUpdate := !ok || updatedPeer.Supersedes(currentPeer)
if shouldUpdate {
n.setPeer(id, updatedPeer)
peers := map[string]peer.Peer{
Expand Down Expand Up @@ -181,7 +224,14 @@ func (n *Node) sendGossip(msg *gossip.Message, addr string) {
}
}

func (n *Node) attemptConnectToCluster(addr string) {
func (n *Node) attemptConnectToCluster(addr string) bool {
n.mu.Lock()
defer n.mu.Unlock()

if len(n.peers) > 0 {
return true
}

peers := map[string]peer.Peer{
n.id: {
Addr: n.addr,
Expand All @@ -198,15 +248,16 @@ func (n *Node) attemptConnectToCluster(addr string) {
payload,
)
n.sendGossip(message, addr)

return false
}

func (n *Node) ConnectToCluster(addr string, attemptPeriod time.Duration) {
ticker := time.NewTicker(attemptPeriod)
for range ticker.C {
if len(n.peers) > 0 {
if n.attemptConnectToCluster(addr) {
break
}
n.attemptConnectToCluster(addr)
}
}

Expand Down Expand Up @@ -244,9 +295,10 @@ func StartGossipListener(node *Node) {
}

type pingerConfig struct {
period time.Duration
timeout time.Duration
k int
period time.Duration
pingTimeout time.Duration
suspectedTimeout time.Duration
k int
}

type pingerOption func(*pingerConfig)
Expand All @@ -257,9 +309,15 @@ func WithPeriod(period time.Duration) pingerOption {
}
}

func WithTimeout(timeout time.Duration) pingerOption {
func WithPingTimeout(timeout time.Duration) pingerOption {
return func(c *pingerConfig) {
c.timeout = timeout
c.pingTimeout = timeout
}
}

func WithSuspectedTimeout(timeout time.Duration) pingerOption {
return func(c *pingerConfig) {
c.suspectedTimeout = timeout
}
}

Expand All @@ -269,11 +327,92 @@ func WithK(k int) pingerOption {
}
}

func runGossipPing(node *Node, cfg *pingerConfig) {
node.mu.Lock()
defer node.mu.Unlock()

// propagate SUSPECTED if ALIVE target not been acked since last ping
if node.targetPeer != "" {
peerBody, ok := node.peers[node.targetPeer]
if ok && peerBody.Status == peer.Alive {
peerBody.Status = peer.Suspected
node.setPeer(node.targetPeer, peerBody)
peers := map[string]peer.Peer{
node.targetPeer: peerBody,
}
payload := gossip.NewPayload(peers, true)
node.addGossip(payload)

// set timeout to declare dead if SUSPECTED for long enough
targetPeer := node.targetPeer
time.AfterFunc(cfg.suspectedTimeout, func() {
node.mu.Lock()
defer node.mu.Unlock()

peerBody, ok := node.peers[targetPeer]
if !ok || peerBody.Status != peer.Suspected {
return
}
peerBody.Status = peer.Dead
node.setPeer(targetPeer, peerBody)
peers := map[string]peer.Peer{
targetPeer: peerBody,
}
payload := gossip.NewPayload(peers, true)
node.addGossip(payload)
})
}
}

// send ping to new random target peer
node.targetPeer = node.getRandomPeer()
peerBody, ok := node.peers[node.targetPeer]
if !ok {
return
}
payload := node.removeGossip()
message := gossip.NewMessage(
gossip.Ping,
node.targetPeer,
node.id,
node.id,
payload,
)
node.sendGossip(message, peerBody.Addr)

// send ping req to k random peers after timeout
targetPeer := node.targetPeer
node.timeout = time.AfterFunc(cfg.pingTimeout, func() {
node.mu.Lock()
defer node.mu.Unlock()

for _, id := range node.getKRandomPeers(cfg.k) {
if id == targetPeer {
continue
}
peerBody, ok := node.peers[id]
if !ok {
continue
}
payload := node.removeGossip()
message := gossip.NewMessage(
gossip.PingReq,
id,
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the subject be targetPeer here?

node.id,
node.id,
payload,
)
node.sendGossip(message, peerBody.Addr)
}
})
}

func StartGossipPinger(node *Node, opts ...pingerOption) {
cfg := &pingerConfig{
period: 1 * time.Second,
timeout: 500 * time.Millisecond,
k: 3,
period: 1 * time.Second,
pingTimeout: 500 * time.Millisecond,
suspectedTimeout: 3 * time.Second,
k: 3,
}

for _, opt := range opts {
Expand All @@ -284,55 +423,6 @@ func StartGossipPinger(node *Node, opts ...pingerOption) {
defer ticker.Stop()

for range ticker.C {
// declare peer dead if has not been acked since last ping
if node.suspectPeer != "" {
peerBody, ok := node.peers[node.suspectPeer]
if ok {
peerBody.Status = peer.Dead
peers := map[string]peer.Peer{
node.suspectPeer: peerBody,
}
payload := gossip.NewPayload(peers, true)
node.addGossip(payload)
node.setPeer(node.suspectPeer, peerBody)
}
}

// send ping to new random suspected peer
payload := node.removeGossip()
node.suspectPeer = node.getRandomPeer()
peerBody, ok := node.peers[node.suspectPeer]
if !ok {
continue
}
message := gossip.NewMessage(
gossip.Ping,
node.suspectPeer,
node.id,
node.id,
payload,
)
node.sendGossip(message, peerBody.Addr)

// send ping req to k random peers after timeout
node.timeout = time.AfterFunc(cfg.timeout, func() {
for _, id := range node.getKRandomPeers(cfg.k) {
if id == node.suspectPeer {
continue
}
peerBody, ok := node.peers[id]
if !ok {
continue
}
message := gossip.NewMessage(
gossip.PingReq,
id,
node.id,
node.id,
payload,
)
node.sendGossip(message, peerBody.Addr)
}
})
runGossipPing(node, cfg)
}
}
Loading
Loading