Skip to content

Commit

Permalink
Eliminate redundant dial mutex causing unbounded connection queue con…
Browse files Browse the repository at this point in the history
…tention (#3088)

* Eliminate redundant dial mutex causing unbounded connection queue contention

* Dialer connection timeouts unit test

---------

Co-authored-by: ofekshenawa <[email protected]>
  • Loading branch information
LINKIWI and ofekshenawa authored Nov 20, 2024
1 parent 930d904 commit 080e051
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
2 changes: 0 additions & 2 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ func (hs *hooksMixin) withProcessPipelineHook(
}

func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
return hs.current.dial(ctx, network, addr)
}

Expand Down
65 changes: 65 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -633,3 +634,67 @@ var _ = Describe("Hook with MinIdleConns", func() {
}))
})
})

var _ = Describe("Dialer connection timeouts", func() {
var client *redis.Client

const dialSimulatedDelay = 1 * time.Second

BeforeEach(func() {
options := redisOptions()
options.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
// Simulated slow dialer.
// Note that the following sleep is deliberately not context-aware.
time.Sleep(dialSimulatedDelay)
return net.Dial("tcp", options.Addr)
}
options.MinIdleConns = 1
client = redis.NewClient(options)
})

AfterEach(func() {
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})

It("does not contend on connection dial for concurrent commands", func() {
var wg sync.WaitGroup

const concurrency = 10

durations := make(chan time.Duration, concurrency)
errs := make(chan error, concurrency)

start := time.Now()
wg.Add(concurrency)

for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()

start := time.Now()
err := client.Ping(ctx).Err()
durations <- time.Since(start)
errs <- err
}()
}

wg.Wait()
close(durations)
close(errs)

// All commands should eventually succeed, after acquiring a connection.
for err := range errs {
Expect(err).NotTo(HaveOccurred())
}

// Each individual command should complete within the simulated dial duration bound.
for duration := range durations {
Expect(duration).To(BeNumerically("<", 2*dialSimulatedDelay))
}

// Due to concurrent execution, the entire test suite should also complete within
// the same dial duration bound applied for individual commands.
Expect(time.Since(start)).To(BeNumerically("<", 2*dialSimulatedDelay))
})
})

0 comments on commit 080e051

Please sign in to comment.