Skip to content

Commit de1b16c

Browse files
added scaffolding for the req-resp manager
1 parent c00bd81 commit de1b16c

File tree

3 files changed

+61
-12
lines changed

3 files changed

+61
-12
lines changed

command_policy_manager.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package redis
2+
3+
import (
4+
"sync"
5+
6+
"github.com/redis/go-redis/v9/internal/routing"
7+
)
8+
9+
var defaultPolicies = map[string]*routing.CommandPolicy{}
10+
11+
type commandPolicyManager struct {
12+
rwmutex *sync.RWMutex
13+
clientPolicies map[string]*routing.CommandPolicy
14+
overwrittenPolicies map[string]*routing.CommandPolicy
15+
}
16+
17+
func newCommandPolicyManager(overwrites interface{}) *commandPolicyManager {
18+
return &commandPolicyManager{}
19+
}
20+
21+
func (cpm *commandPolicyManager) updateClientPolicies(policies interface{}) {
22+
cpm.rwmutex.Lock()
23+
defer cpm.rwmutex.Unlock()
24+
}
25+
26+
func (cpm *commandPolicyManager) getCmdPolicy(cmd Cmder) *routing.CommandPolicy {
27+
cpm.rwmutex.RLock()
28+
defer cpm.rwmutex.RUnlock()
29+
30+
cmdName := cmd.Name()
31+
32+
if policy, ok := cpm.overwrittenPolicies[cmdName]; ok {
33+
return policy
34+
}
35+
36+
if policy, ok := cpm.clientPolicies[cmdName]; ok {
37+
return policy
38+
}
39+
40+
if policy, ok := defaultPolicies[cmdName]; ok {
41+
return policy
42+
}
43+
44+
return nil
45+
}

osscluster.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -922,10 +922,11 @@ func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, er
922922
// or more underlying connections. It's safe for concurrent use by
923923
// multiple goroutines.
924924
type ClusterClient struct {
925-
opt *ClusterOptions
926-
nodes *clusterNodes
927-
state *clusterStateHolder
928-
cmdsInfoCache *cmdsInfoCache
925+
opt *ClusterOptions
926+
nodes *clusterNodes
927+
state *clusterStateHolder
928+
cmdsInfoCache *cmdsInfoCache
929+
cmdPolicyManager *commandPolicyManager
929930
cmdable
930931
hooksMixin
931932
}
@@ -943,6 +944,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
943944
c.state = newClusterStateHolder(c.loadState)
944945
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
945946
c.cmdable = c.Process
947+
c.cmdPolicyManager = newCommandPolicyManager(nil)
946948
c.initHooks(hooks{
947949
dial: nil,
948950
process: c.process,
@@ -1334,7 +1336,7 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
13341336

13351337
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
13361338
for _, cmd := range cmds {
1337-
policy := c.getCommandPolicy(ctx, cmd)
1339+
policy := c.cmdPolicyManager.getCmdPolicy(cmd)
13381340
if policy != nil && !policy.CanBeUsedInPipeline() {
13391341
return fmt.Errorf(
13401342
"redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard; Note: This behavior is subject to change in the future", cmd.Name(),
@@ -1351,7 +1353,7 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
13511353
}
13521354

13531355
for _, cmd := range cmds {
1354-
policy := c.getCommandPolicy(ctx, cmd)
1356+
policy := c.cmdPolicyManager.getCmdPolicy(cmd)
13551357
if policy != nil && !policy.CanBeUsedInPipeline() {
13561358
return fmt.Errorf(
13571359
"redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard; Note: This behavior is subject to change in the future", cmd.Name(),

osscluster_router.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type slotResult struct {
2121

2222
// routeAndRun routes a command to the appropriate cluster nodes and executes it
2323
func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *clusterNode) error {
24-
policy := c.getCommandPolicy(ctx, cmd)
24+
policy := c.cmdPolicyManager.getCmdPolicy(cmd)
2525

2626
switch {
2727
case policy != nil && policy.Request == routing.ReqAllNodes:
@@ -38,11 +38,13 @@ func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *cluste
3838
}
3939

4040
// getCommandPolicy retrieves the routing policy for a command
41-
func (c *ClusterClient) getCommandPolicy(ctx context.Context, cmd Cmder) *routing.CommandPolicy {
42-
if cmdInfo := c.cmdInfo(ctx, cmd.Name()); cmdInfo != nil && cmdInfo.Tips != nil {
43-
return cmdInfo.Tips
44-
}
45-
return nil
41+
func (c *ClusterClient) getCommandPolicy(cmd Cmder) *routing.CommandPolicy {
42+
43+
return c.cmdPolicyManager.getCmdPolicy(cmd)
44+
// if cmdInfo := c.cmdInfo(ctx, cmd.Name()); cmdInfo != nil && cmdInfo.Tips != nil {
45+
// return cmdInfo.Tips
46+
// }
47+
// return nil
4648
}
4749

4850
// executeDefault handles standard command routing based on keys

0 commit comments

Comments
 (0)