Skip to content

Commit 6e8fc5e

Browse files
ayush-sardarueian
andauthored
feat: read command routing between primary and replica nodes (#894)
Signed-off-by: Rueian <[email protected]> Co-authored-by: Rueian <[email protected]>
1 parent 8aa9c89 commit 6e8fc5e

File tree

4 files changed

+2072
-120
lines changed

4 files changed

+2072
-120
lines changed

cluster.go

Lines changed: 96 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,20 @@ var ErrNoSlot = errors.New("the slot has no redis node")
1919
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")
2020
var ErrInvalidShardsRefreshInterval = errors.New("ShardsRefreshInterval must be greater than or equal to 0")
2121
var ErrReplicaOnlyConflictWithReplicaSelector = errors.New("ReplicaOnly conflicts with ReplicaSelector option")
22+
var ErrReplicaOnlyConflictWithReadNodeSelector = errors.New("ReplicaOnly conflicts with ReadNodeSelector option")
23+
var ErrReplicaSelectorConflictWithReadNodeSelector = errors.New("either set ReplicaSelector or ReadNodeSelector, not both")
2224
var ErrSendToReplicasNotSet = errors.New("SendToReplicas must be set when ReplicaSelector is set")
2325

2426
type clusterClient struct {
25-
pslots [16384]conn
27+
wslots [16384]conn
2628
retryHandler retryHandler
2729
opt *ClientOption
2830
rOpt *ClientOption
2931
conns map[string]connrole
3032
connFn connFn
3133
stopCh chan struct{}
3234
sc call
33-
rslots []conn
35+
rslots [][]NodeInfo
3436
mu sync.RWMutex
3537
stop uint32
3638
cmd Builder
@@ -45,7 +47,7 @@ type connrole struct {
4547
//replica bool <- this field is removed because a server may have mixed roles at the same time in the future. https://github.com/valkey-io/valkey/issues/1372
4648
}
4749

48-
var replicaOnlySelector = func(_ uint16, replicas []ReplicaInfo) int {
50+
var replicaOnlySelector = func(_ uint16, replicas []NodeInfo) int {
4951
return util.FastRand(len(replicas))
5052
}
5153

@@ -67,11 +69,16 @@ func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*
6769
if opt.ReplicaOnly && opt.ReplicaSelector != nil {
6870
return nil, ErrReplicaOnlyConflictWithReplicaSelector
6971
}
72+
if opt.ReplicaOnly && opt.ReadNodeSelector != nil {
73+
return nil, ErrReplicaOnlyConflictWithReadNodeSelector
74+
}
75+
if opt.ReplicaSelector != nil && opt.ReadNodeSelector != nil {
76+
return nil, ErrReplicaSelectorConflictWithReadNodeSelector
77+
}
7078
if opt.ReplicaSelector != nil && opt.SendToReplicas == nil {
7179
return nil, ErrSendToReplicasNotSet
7280
}
73-
74-
if opt.SendToReplicas != nil && opt.ReplicaSelector == nil {
81+
if opt.SendToReplicas != nil && opt.ReplicaSelector == nil && opt.ReadNodeSelector == nil {
7582
opt.ReplicaSelector = replicaOnlySelector
7683
}
7784

@@ -244,68 +251,77 @@ func (c *clusterClient) _refresh() (err error) {
244251
}
245252
c.mu.RUnlock()
246253

247-
pslots := [16384]conn{}
248-
var rslots []conn
249-
for master, g := range groups {
254+
wslots := [16384]conn{}
255+
var rslots [][]NodeInfo
256+
for _, g := range groups {
257+
258+
for i, nodeInfo := range g.nodes {
259+
g.nodes[i].conn = conns[nodeInfo.Addr].conn
260+
}
261+
250262
switch {
251263
case c.opt.ReplicaOnly && len(g.nodes) > 1:
252264
nodesCount := len(g.nodes)
253265
for _, slot := range g.slots {
254266
for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ {
255-
pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)].Addr].conn
267+
wslots[i] = g.nodes[1+util.FastRand(nodesCount-1)].conn
256268
}
257269
}
258270
case c.rOpt != nil:
259271
if len(rslots) == 0 { // lazy init
260-
rslots = make([]conn, 16384)
272+
rslots = make([][]NodeInfo, 16384)
261273
}
262274
if len(g.nodes) > 1 {
263-
n := len(g.nodes) - 1
264-
265275
if c.opt.EnableReplicaAZInfo {
266276
var wg sync.WaitGroup
267-
for i := 1; i <= n; i += 4 { // batch AZ() for every 4 connections
268-
for j := i; j <= i+4 && j <= n; j++ {
277+
for i := 0; i < len(g.nodes); i += 4 { // batch AZ() for every 4 connections
278+
for j := i; j < i+4 && j < len(g.nodes); j++ {
269279
wg.Add(1)
270-
go func(wg *sync.WaitGroup, conn conn, info *ReplicaInfo) {
271-
info.AZ = conn.AZ()
280+
go func(wg *sync.WaitGroup, info *NodeInfo) {
281+
info.AZ = info.conn.AZ()
272282
wg.Done()
273-
}(&wg, conns[g.nodes[j].Addr].conn, &g.nodes[j])
283+
}(&wg, &g.nodes[j])
274284
}
275285
wg.Wait()
276286
}
277287
}
278-
279288
for _, slot := range g.slots {
280289
for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ {
281-
pslots[i] = conns[master].conn
282-
rIndex := c.opt.ReplicaSelector(uint16(i), g.nodes[1:])
283-
if rIndex >= 0 && rIndex < n {
284-
rslots[i] = conns[g.nodes[1+rIndex].Addr].conn
290+
wslots[i] = g.nodes[0].conn
291+
if c.opt.ReadNodeSelector != nil {
292+
rslots[i] = g.nodes
285293
} else {
286-
rslots[i] = conns[master].conn
294+
rIndex := c.opt.ReplicaSelector(uint16(i), g.nodes[1:]) // exclude master node
295+
if rIndex >= 0 && rIndex < len(g.nodes)-1 {
296+
node := g.nodes[1+rIndex]
297+
rslots[i] = nodes{node}
298+
} else {
299+
node := g.nodes[0] // fallback to master
300+
rslots[i] = nodes{node}
301+
}
287302
}
288303
}
289304
}
290305
} else {
291306
for _, slot := range g.slots {
292307
for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ {
293-
pslots[i] = conns[master].conn
294-
rslots[i] = conns[master].conn
308+
node := g.nodes[0]
309+
wslots[i] = node.conn
310+
rslots[i] = nodes{node}
295311
}
296312
}
297313
}
298314
default:
299315
for _, slot := range g.slots {
300316
for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ {
301-
pslots[i] = conns[master].conn
317+
wslots[i] = g.nodes[0].conn
302318
}
303319
}
304320
}
305321
}
306322

307323
c.mu.Lock()
308-
c.pslots = pslots
324+
c.wslots = wslots
309325
c.rslots = rslots
310326
c.conns = conns
311327
c.mu.Unlock()
@@ -336,7 +352,7 @@ func (c *clusterClient) nodes() []string {
336352
return nodes
337353
}
338354

339-
type nodes []ReplicaInfo
355+
type nodes []NodeInfo
340356

341357
type group struct {
342358
nodes nodes
@@ -368,7 +384,7 @@ func parseSlots(slots RedisMessage, defaultAddr string) map[string]group {
368384
g.nodes = make(nodes, 0, len(v.values())-2)
369385
for i := 2; i < len(v.values()); i++ {
370386
if dst := parseEndpoint(defaultAddr, v.values()[i].values()[0].string(), v.values()[i].values()[1].intlen); dst != "" {
371-
g.nodes = append(g.nodes, ReplicaInfo{Addr: dst})
387+
g.nodes = append(g.nodes, NodeInfo{Addr: dst})
372388
}
373389
}
374390
}
@@ -411,7 +427,7 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
411427
if dictRole := dict["role"]; dictRole.string() == "master" {
412428
m = len(g.nodes)
413429
}
414-
g.nodes = append(g.nodes, ReplicaInfo{Addr: dst})
430+
g.nodes = append(g.nodes, NodeInfo{Addr: dst})
415431
}
416432
}
417433
if m >= 0 {
@@ -443,9 +459,19 @@ func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
443459
break
444460
}
445461
} else if toReplica && c.rslots != nil {
446-
p = c.rslots[slot]
462+
if c.opt.ReadNodeSelector != nil {
463+
nodes := c.rslots[slot]
464+
rIndex := c.opt.ReadNodeSelector(slot, nodes)
465+
if rIndex >= 0 && rIndex < len(nodes) {
466+
p = c.rslots[slot][rIndex].conn
467+
} else {
468+
p = c.wslots[slot]
469+
}
470+
} else {
471+
p = c.rslots[slot][0].conn
472+
}
447473
} else {
448-
p = c.pslots[slot]
474+
p = c.wslots[slot]
449475
}
450476
c.mu.RUnlock()
451477
return p
@@ -476,7 +502,7 @@ func (c *clusterClient) redirectOrNew(addr string, prev conn, slot uint16, mode
476502
cc = connrole{conn: p}
477503
c.conns[addr] = cc
478504
if mode == RedirectMove {
479-
c.pslots[slot] = p
505+
c.wslots[slot] = p
480506
}
481507
} else if prev == cc.conn {
482508
// try reconnection if the MOVED redirects to the same host,
@@ -490,7 +516,7 @@ func (c *clusterClient) redirectOrNew(addr string, prev conn, slot uint16, mode
490516
cc = connrole{conn: p}
491517
c.conns[addr] = cc
492518
if mode == RedirectMove { // MOVED should always point to the primary.
493-
c.pslots[slot] = p
519+
c.wslots[slot] = p
494520
}
495521
}
496522
c.mu.Unlock()
@@ -575,14 +601,27 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init
575601

576602
if !init && c.rslots != nil && c.opt.SendToReplicas != nil {
577603
var bm bitmap
604+
itor := make(map[int]int)
578605
bm.Init(len(multi))
579606
for i, cmd := range multi {
580607
var cc conn
608+
slot := cmd.Slot()
581609
if c.opt.SendToReplicas(cmd) {
582610
bm.Set(i)
583-
cc = c.rslots[cmd.Slot()]
611+
if c.opt.ReadNodeSelector != nil {
612+
nodes := c.rslots[slot]
613+
rIndex := c.opt.ReadNodeSelector(slot, nodes)
614+
if rIndex > 0 && rIndex < len(nodes) {
615+
itor[i] = rIndex
616+
} else {
617+
rIndex = 0 // default itor[i] = 0
618+
}
619+
cc = nodes[rIndex].conn
620+
} else {
621+
cc = c.rslots[slot][0].conn
622+
}
584623
} else {
585-
cc = c.pslots[cmd.Slot()]
624+
cc = c.wslots[slot]
586625
}
587626
if cc == nil {
588627
return nil, false
@@ -599,9 +638,9 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init
599638
for i, cmd := range multi {
600639
var cc conn
601640
if bm.Get(i) {
602-
cc = c.rslots[cmd.Slot()]
641+
cc = c.rslots[cmd.Slot()][itor[i]].conn
603642
} else {
604-
cc = c.pslots[cmd.Slot()]
643+
cc = c.wslots[cmd.Slot()]
605644
}
606645
re := retries.m[cc]
607646
re.commands = append(re.commands, cmd)
@@ -621,7 +660,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init
621660
} else if init && last != cmd.Slot() {
622661
panic(panicMixCxSlot)
623662
}
624-
cc := c.pslots[cmd.Slot()]
663+
cc := c.wslots[cmd.Slot()]
625664
if cc == nil {
626665
return nil, false
627666
}
@@ -630,7 +669,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init
630669

631670
if last == cmds.InitSlot {
632671
// if all commands have no slots, such as INFO, we pick a non-nil slot.
633-
for i, cc := range c.pslots {
672+
for i, cc := range c.wslots {
634673
if cc != nil {
635674
last = uint16(i)
636675
count.m[cc] = inits
@@ -641,7 +680,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init
641680
return nil, false
642681
}
643682
} else if init {
644-
cc := c.pslots[last]
683+
cc := c.wslots[last]
645684
count.m[cc] += inits
646685
}
647686

@@ -654,9 +693,9 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init
654693
for i, cmd := range multi {
655694
var cc conn
656695
if cmd.Slot() != cmds.InitSlot {
657-
cc = c.pslots[cmd.Slot()]
696+
cc = c.wslots[cmd.Slot()]
658697
} else {
659-
cc = c.pslots[last]
698+
cc = c.wslots[last]
660699
}
661700
re := retries.m[cc]
662701
re.commands = append(re.commands, cmd)
@@ -1013,7 +1052,7 @@ func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache {
10131052
count := conncountp.Get(len(c.conns), len(c.conns))
10141053
if c.opt.SendToReplicas == nil || c.rslots == nil {
10151054
for _, cmd := range multi {
1016-
p := c.pslots[cmd.Cmd.Slot()]
1055+
p := c.wslots[cmd.Cmd.Slot()]
10171056
if p == nil {
10181057
return nil
10191058
}
@@ -1027,7 +1066,7 @@ func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache {
10271066
conncountp.Put(count)
10281067

10291068
for i, cmd := range multi {
1030-
cc := c.pslots[cmd.Cmd.Slot()]
1069+
cc := c.wslots[cmd.Cmd.Slot()]
10311070
re := retries.m[cc]
10321071
re.commands = append(re.commands, cmd)
10331072
re.cIndexes = append(re.cIndexes, i)
@@ -1044,10 +1083,20 @@ func (c *clusterClient) _pickMultiCache(multi []CacheableTTL) *connretrycache {
10441083
}
10451084
for i, cmd := range multi {
10461085
var p conn
1086+
slot := cmd.Cmd.Slot()
10471087
if c.opt.SendToReplicas(Completed(cmd.Cmd)) {
1048-
p = c.rslots[cmd.Cmd.Slot()]
1088+
if c.opt.ReadNodeSelector != nil {
1089+
rIndex := c.opt.ReadNodeSelector(slot, c.rslots[slot])
1090+
if rIndex >= 0 && rIndex < len(c.rslots[slot]) {
1091+
p = c.rslots[slot][rIndex].conn
1092+
} else {
1093+
p = c.wslots[slot]
1094+
}
1095+
} else {
1096+
p = c.rslots[slot][0].conn
1097+
}
10491098
} else {
1050-
p = c.pslots[cmd.Cmd.Slot()]
1099+
p = c.wslots[slot]
10511100
}
10521101
if p == nil {
10531102
return nil

0 commit comments

Comments
 (0)