Skip to content

Commit

Permalink
Merge branch 'master' into connection_issue
Browse files Browse the repository at this point in the history
  • Loading branch information
EXPEbdodla authored Nov 22, 2024
2 parents 7c445ab + e63669e commit 06a4de2
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/spellcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Check Spelling
uses: rojopolis/spellcheck-github-actions@0.40.0
uses: rojopolis/spellcheck-github-actions@0.45.0
with:
config_path: .github/spellcheck-settings.yml
task_name: Markdown
2 changes: 2 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func (cmd *baseCmd) stringArg(pos int) string {
switch v := arg.(type) {
case string:
return v
case []byte:
return string(v)
default:
// TODO: consider using appendArg
return fmt.Sprint(v)
Expand Down
25 changes: 24 additions & 1 deletion osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/redis/go-redis/v9/internal/rand"
)

const (
minLatencyMeasurementInterval = 10 * time.Second
)

var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")

// ClusterOptions are used to configure a cluster client and should be
Expand Down Expand Up @@ -316,6 +320,10 @@ type clusterNode struct {
latency uint32 // atomic
generation uint32 // atomic
failing uint32 // atomic

// last time the latency measurement was performed for the node, stored in nanoseconds
// from epoch
lastLatencyMeasurement int64 // atomic
}

func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
Expand Down Expand Up @@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
latency = float64(dur) / float64(successes)
}
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
n.SetLastLatencyMeasurement(time.Now())
}

func (n *clusterNode) Latency() time.Duration {
Expand Down Expand Up @@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
return atomic.LoadUint32(&n.generation)
}

func (n *clusterNode) LastLatencyMeasurement() int64 {
return atomic.LoadInt64(&n.lastLatencyMeasurement)
}

func (n *clusterNode) SetGeneration(gen uint32) {
for {
v := atomic.LoadUint32(&n.generation)
Expand All @@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
}
}

func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
for {
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
break
}
}
}

//------------------------------------------------------------------------------

type clusterNodes struct {
Expand Down Expand Up @@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
c.mu.Lock()

c.activeAddrs = c.activeAddrs[:0]
now := time.Now()
for addr, node := range c.nodes {
if node.Generation() >= generation {
c.activeAddrs = append(c.activeAddrs, addr)
if c.opt.RouteByLatency {
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
go node.updateLatency()
}
continue
Expand Down
26 changes: 26 additions & 0 deletions osscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,32 @@ var _ = Describe("ClusterClient", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("determines hash slots correctly for generic commands", func() {
opt := redisClusterOptions()
opt.MaxRedirects = -1
client := cluster.newClusterClient(ctx, opt)

err := client.Do(ctx, "GET", "A").Err()
Expect(err).To(Equal(redis.Nil))

err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(Equal(redis.Nil))

Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())

err = client.Do(ctx, "GET", "A").Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))

err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))

Expect(client.Close()).NotTo(HaveOccurred())
})

It("follows node redirection immediately", func() {
// Configure retry backoffs far in excess of the expected duration of redirection
opt := redisClusterOptions()
Expand Down
2 changes: 0 additions & 2 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
}

func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
return hs.current.dial(ctx, network, addr)
}

Expand Down
65 changes: 65 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
}))
})
})

var _ = Describe("Dialer connection timeouts", func() {
var client *redis.Client

const dialSimulatedDelay = 1 * time.Second

BeforeEach(func() {
options := redisOptions()
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
// Simulated slow dialer.
// Note that the following sleep is deliberately not context-aware.
time.Sleep(dialSimulatedDelay)
return net.Dial("tcp", options.Addr)
}
options.MinIdleConns = 1
client = redis.NewClient(options)
})

AfterEach(func() {
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})

It("does not contend on connection dial for concurrent commands", func() {
var wg sync.WaitGroup

const concurrency = 10

durations := make(chan time.Duration, concurrency)
errs := make(chan error, concurrency)

start := time.Now()
wg.Add(concurrency)

for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()

start := time.Now()
err := client.Ping(ctx).Err()
durations <- time.Since(start)
errs <- err
}()
}

wg.Wait()
close(durations)
close(errs)

// All commands should eventually succeed, after acquiring a connection.
for err := range errs {
Expect(err).NotTo(HaveOccurred())
}

// Each individual command should complete within the simulated dial duration bound.
for duration := range durations {
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
}

// Due to concurrent execution, the entire test suite should also complete within
// the same dial duration bound applied for individual commands.
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
})
})

0 comments on commit 06a4de2

Please sign in to comment.