Skip to content

Commit

Permalink
Merge branch 'main' into flatten-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Feb 12, 2025
2 parents 35822ea + 98c2e01 commit 14b1b58
Show file tree
Hide file tree
Showing 57 changed files with 2,871 additions and 487 deletions.
2 changes: 1 addition & 1 deletion .circleci/generate_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
steps:
- checkout
- go/install:
version: 1.21.0
version: 1.22.0
EOF

# Loop through each module and generate job configurations
Expand Down
11 changes: 10 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ name: Go Modules Test

on: [push, pull_request]

# https://docs.github.com/en/actions/learn-github-actions/expressions
# https://docs.github.com/en/actions/learn-github-actions/contexts#github-context
concurrency:
# Use github.run_id on main branch
# Use github.event.pull_request.number on pull requests, so it's unique per pull request
# Use github.ref on other branches, so it's unique per branch
group: ${{ github.workflow }}-${{ github.ref == 'refs/heads/main' && github.run_id || github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
prepare-matrix:
runs-on: ubuntu-latest
Expand All @@ -20,7 +29,7 @@ jobs:
fail-fast: false
matrix:
module: ${{fromJson(needs.prepare-matrix.outputs.matrix)}}
go-version: ['1.21.0', '1.22.0', '1.23.0']
go-version: ['1.22.0', '1.23.0']
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ A fast Golang Redis client that does auto pipelining and supports server-assiste
* Pub/Sub, Sharded Pub/Sub, Streams
* Redis Cluster, Sentinel, RedisJSON, RedisBloom, RediSearch, RedisTimeseries, etc.
* [Probabilistic Data Structures without Redis Stack](./rueidisprob)
* [Availability zone affinity routing](#availability-zone-affinity-routing)

---

Expand Down Expand Up @@ -103,7 +104,7 @@ A benchmark result performed on two GCP n2-highcpu-2 machines also shows that ru

While auto pipelining maximizes throughput, it relys on additional goroutines to process requests and responses and may add some latencies due to goroutine scheduling and head of line blocking.

You can avoid this by setting `DisableAutoPipelining` to ture, then it will switch to connection pooling approach and serve each request with dedicated connection on the same goroutine.
You can avoid this by setting `DisableAutoPipelining` to true, then it will switch to connection pooling approach and serve each request with dedicated connection on the same goroutine.

### Manual Pipelining

Expand Down Expand Up @@ -411,6 +412,26 @@ client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:6379/0")
client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:26379/0?master_set=my_master"))
```

### Availability Zone Affinity Routing

Starting from Valkey 8.1, Valkey server provides the `availability-zone` information for clients to know where the server is located.
For using this information to route requests to the replica located in the same availability zone,
set the `EnableReplicaAZInfo` option and your `ReplicaSelector` function. For example:

```go
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"address.example.com:6379"},
EnableReplicaAZInfo: true,
ReplicaSelector: func(slot uint16, replicas []rueidis.ReplicaInfo) int {
for i, replica := range replicas {
if replica.AZ == "us-east-1a" {
return i // return the index of the replica.
}
}
return -1 // send to the primary.
},
})
```

## Arbitrary Command

Expand Down Expand Up @@ -574,6 +595,12 @@ if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1"
Contributions are welcome, including [issues](https://github.com/redis/rueidis/issues), [pull requests](https://github.com/redis/rueidis/pulls), and [discussions](https://github.com/redis/rueidis/discussions).
Contributions mean a lot to us and help us improve this library and the community!

Thanks to all the people who already contributed!

<a href="https://github.com/redis/rueidis/graphs/contributors">
<img src="https://contributors-img.web.app/image?repo=redis/rueidis" />
</a>

### Generate Command Builders

Command builders are generated based on the definitions in [./hack/cmds](./hack/cmds) by running:
Expand Down
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

type singleClient struct {
conn conn
cmd Builder
retryHandler retryHandler
stop uint32
cmd Builder
retry bool
DisableCache bool
}
Expand Down Expand Up @@ -197,9 +197,9 @@ func (c *singleClient) Close() {
type dedicatedSingleClient struct {
conn conn
wire wire
cmd Builder
retryHandler retryHandler
mark uint32
cmd Builder
retry bool
}

Expand Down
8 changes: 8 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type mockConn struct {
DoMultiStreamFn func(cmd ...Completed) MultiRedisResultStream
InfoFn func() map[string]RedisMessage
VersionFn func() int
AZFn func() string
ErrorFn func() error
CloseFn func()
DialFn func() error
Expand Down Expand Up @@ -163,6 +164,13 @@ func (m *mockConn) Version() int {
return 0
}

func (m *mockConn) AZ() string {
if m.AZFn != nil {
return m.AZFn()
}
return ""
}

func (m *mockConn) Error() error {
if m.ErrorFn != nil {
return m.ErrorFn()
Expand Down
31 changes: 23 additions & 8 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ var ErrSendToReplicasNotSet = errors.New("SendToReplicas must be set when Replic

type clusterClient struct {
pslots [16384]conn
rslots []conn
sc call
retryHandler retryHandler
opt *ClientOption
rOpt *ClientOption
conns map[string]connrole
connFn connFn
opt *ClientOption
retryHandler retryHandler
stopCh chan struct{}
cmd Builder
sc call
rslots []conn
mu sync.RWMutex
stop uint32
cmd Builder
retry bool
}

Expand Down Expand Up @@ -259,6 +259,21 @@ func (c *clusterClient) _refresh() (err error) {
}
if len(g.nodes) > 1 {
n := len(g.nodes) - 1

if c.opt.EnableReplicaAZInfo {
var wg sync.WaitGroup
for i := 1; i <= n; i += 4 { // batch AZ() for every 4 connections
for j := i; j <= i+4 && j <= n; j++ {
wg.Add(1)
go func(wg *sync.WaitGroup, conn conn, info *ReplicaInfo) {
info.AZ = conn.AZ()
wg.Done()
}(&wg, conns[g.nodes[j].Addr].conn, &g.nodes[j])
}
wg.Wait()
}
}

for _, slot := range g.slots {
for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ {
pslots[i] = conns[master].conn
Expand Down Expand Up @@ -1221,15 +1236,15 @@ func (c *clusterClient) shouldRefreshRetry(err error, ctx context.Context) (addr
}

type dedicatedClusterClient struct {
client *clusterClient
conn conn
wire wire
retryHandler retryHandler
client *clusterClient
pshks *pshks
mu sync.Mutex
cmd Builder
retryHandler retryHandler
retry bool
slot uint16
retry bool
mark bool
}

Expand Down
110 changes: 107 additions & 3 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,111 @@ func TestClusterClientInit(t *testing.T) {
t.Fatalf("unexpected node assigned to rslot 16383")
}
})

t.Run("Refresh cluster which has multi replicas with az", func(t *testing.T) {
primaryNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiRespWithMultiReplicas
}
return RedisResult{
err: errors.New("unexpected call"),
}
},
AZFn: func() string {
return "us-west-1a"
},
}
replicaNodeConn1 := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return RedisResult{
err: errors.New("unexpected call"),
}
},
AZFn: func() string {
return "us-west-1a"
},
}
replicaNodeConn2 := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return RedisResult{
err: errors.New("unexpected call"),
}
},
AZFn: func() string {
return "us-west-1b"
},
}
replicaNodeConn3 := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return RedisResult{
err: errors.New("unexpected call"),
}
},
AZFn: func() string {
return "us-west-1c"
},
}

client, err := newClusterClient(
&ClientOption{
InitAddress: []string{"127.0.0.1:0"},
SendToReplicas: func(cmd Completed) bool {
return true
},
ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int {
for i, replica := range replicas {
if replica.AZ == "us-west-1b" {
return i
}
}
return -1
},
EnableReplicaAZInfo: true,
},
func(dst string, opt *ClientOption) conn {
switch {
case dst == "127.0.0.2:1" || dst == "127.0.1.2:1":
return replicaNodeConn1
case dst == "127.0.0.3:2" || dst == "127.0.1.3:2":
return replicaNodeConn2
case dst == "127.0.0.4:3" || dst == "127.0.1.4:3":
return replicaNodeConn3
default:
return primaryNodeConn
}
},
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}

if client.pslots[0] != primaryNodeConn {
t.Fatalf("unexpected node assigned to pslot 0")
}
if client.pslots[8192] != primaryNodeConn {
t.Fatalf("unexpected node assigned to pslot 8192")
}
if client.pslots[8193] != primaryNodeConn {
t.Fatalf("unexpected node assigned to pslot 8193")
}
if client.pslots[16383] != primaryNodeConn {
t.Fatalf("unexpected node assigned to pslot 16383")
}
if client.rslots[0] != replicaNodeConn2 {
t.Fatalf("unexpected node assigned to rslot 0")
}
if client.rslots[8192] != replicaNodeConn2 {
t.Fatalf("unexpected node assigned to rslot 8192")
}
if client.rslots[8193] != replicaNodeConn2 {
t.Fatalf("unexpected node assigned to rslot 8193")
}
if client.rslots[16383] != replicaNodeConn2 {
t.Fatalf("unexpected node assigned to rslot 16383")
}
})
}

//gocyclo:ignore
Expand Down Expand Up @@ -6738,9 +6843,8 @@ func TestClusterClient_SendReadOperationToReplicaNodeWriteOperationToPrimaryNode
return e
},
}
primaryNodeConn.AcquireFn = func() wire {
return w
}
primaryNodeConn.AcquireFn = func() wire { return w }
replicaNodeConn.AcquireFn = func() wire { return w } // Subscribe can work on replicas
if err := client.Dedicated(func(c DedicatedClient) error {
return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {})
}); err != e {
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
module github.com/redis/rueidis

go 1.21
go 1.22.0

require (
github.com/onsi/gomega v1.34.1
golang.org/x/sys v0.24.0
github.com/onsi/gomega v1.36.2
golang.org/x/sys v0.29.0
)

require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/kr/text v0.2.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/tools v0.29.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
34 changes: 16 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg=
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k=
github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
github.com/onsi/ginkgo/v2 v2.22.1 h1:QW7tbJAUDyVDVOM5dFa7qaybo+CRfR7bemlQUN6Z8aM=
github.com/onsi/ginkgo/v2 v2.22.1/go.mod h1:S6aTpoRsSq2cZOd+pssHAlKW/Q/jZt6cPrPlnj4a1xM=
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE=
golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
Loading

0 comments on commit 14b1b58

Please sign in to comment.