Skip to content

Commit

Permalink
Only check latencies once every 10 seconds with routeByLatency (#2795)
Browse files Browse the repository at this point in the history
* Only check latencies once every 10 seconds with `routeByLatency`

`routeByLatency` currently checks latencies any time a server returns
a MOVED or READONLY reply. When a shard is down, the ClusterClient
chooses to issue the request to a random server, which returns a MOVED
reply. This causes a state refresh and a latency update on all servers.
This can lead to significant ping load to clusters with a large number
of clients.

This introduces logic to ping only once every 10 seconds, only
performing a latency update on a node during the `GC` function if the
latency was set later than 10 seconds ago.

Fixes #2782

* use UnixNano instead of Unix for better precision

---------

Co-authored-by: ofekshenawa <[email protected]>
  • Loading branch information
justinmir and ofekshenawa authored Nov 20, 2024
1 parent 080e051 commit f1ffb55
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/redis/go-redis/v9/internal/rand"
)

const (
minLatencyMeasurementInterval = 10 * time.Second
)

var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")

// ClusterOptions are used to configure a cluster client and should be
Expand Down Expand Up @@ -316,6 +320,10 @@ type clusterNode struct {
latency uint32 // atomic
generation uint32 // atomic
failing uint32 // atomic

// last time the latency measurement was performed for the node, stored in nanoseconds
// from epoch
lastLatencyMeasurement int64 // atomic
}

func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
Expand Down Expand Up @@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
latency = float64(dur) / float64(successes)
}
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
n.SetLastLatencyMeasurement(time.Now())
}

func (n *clusterNode) Latency() time.Duration {
Expand Down Expand Up @@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
return atomic.LoadUint32(&n.generation)
}

func (n *clusterNode) LastLatencyMeasurement() int64 {
return atomic.LoadInt64(&n.lastLatencyMeasurement)
}

func (n *clusterNode) SetGeneration(gen uint32) {
for {
v := atomic.LoadUint32(&n.generation)
Expand All @@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
}
}

func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
for {
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
break
}
}
}

//------------------------------------------------------------------------------

type clusterNodes struct {
Expand Down Expand Up @@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
c.mu.Lock()

c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes {
if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency {
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency()
}
continue
Expand Down

0 comments on commit f1ffb55

Please sign in to comment.