Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

add random nodes support when call cluster slots #482

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion rediscluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ class RedisCluster(Redis):

def __init__(self, host=None, port=None, startup_nodes=None, max_connections=None, max_connections_per_node=False, init_slot_cache=True,
readonly_mode=False, reinitialize_steps=None, skip_full_coverage_check=False, nodemanager_follow_cluster=False,
connection_class=None, read_from_replicas=False, cluster_down_retry_attempts=3, host_port_remap=None, **kwargs):
nodemanager_random_cluster_nodes=False, connection_class=None, read_from_replicas=False, cluster_down_retry_attempts=3,
host_port_remap=None, **kwargs):
"""
:startup_nodes:
List of nodes that initial bootstrapping can be done from
Expand All @@ -319,6 +320,9 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=Non
The node manager will during initialization try the last set of nodes that
it was operating on. This will allow the client to drift along side the cluster
if the cluster nodes move around alot.
:nodemanager_random_cluster_nodes:
The node manager will use disorder nodes during initialization, avoid all
cluster slots command are send to first node.
:**kwargs:
Extra arguments that will be sent into Redis instance when created
(See Official redis-py doc for supported kwargs
Expand Down Expand Up @@ -373,6 +377,7 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=Non
max_connections_per_node=max_connections_per_node,
skip_full_coverage_check=skip_full_coverage_check,
nodemanager_follow_cluster=nodemanager_follow_cluster,
nodemanager_random_cluster_nodes=nodemanager_random_cluster_nodes,
connection_class=connection_class,
host_port_remap=host_port_remap,
**kwargs
Expand Down
14 changes: 11 additions & 3 deletions rediscluster/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class ClusterConnectionPool(ConnectionPool):

def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=None,
max_connections=None, max_connections_per_node=False, reinitialize_steps=None,
skip_full_coverage_check=False, nodemanager_follow_cluster=False, host_port_remap=None,
skip_full_coverage_check=False, nodemanager_follow_cluster=False,
nodemanager_random_cluster_nodes=False, host_port_remap=None,
**connection_kwargs):
"""
:skip_full_coverage_check:
Expand All @@ -122,6 +123,9 @@ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=No
The node manager will during initialization try the last set of nodes that
it was operating on. This will allow the client to drift along side the cluster
if the cluster nodes move around a lot.
:nodemanager_random_cluster_nodes:
The node manager will use disorder nodes during initialization, avoid all
cluster slots command are send to first node.
"""
log.debug("Creating new ClusterConnectionPool instance")

Expand Down Expand Up @@ -152,6 +156,7 @@ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=No
skip_full_coverage_check=skip_full_coverage_check,
max_connections=self.max_connections,
nodemanager_follow_cluster=nodemanager_follow_cluster,
nodemanager_random_cluster_nodes=nodemanager_random_cluster_nodes,
host_port_remap=host_port_remap,
**connection_kwargs
)
Expand Down Expand Up @@ -422,7 +427,7 @@ class ClusterBlockingConnectionPool(ClusterConnectionPool):
def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=None,
max_connections=50, max_connections_per_node=False, reinitialize_steps=None,
skip_full_coverage_check=False, nodemanager_follow_cluster=False,
timeout=20, **connection_kwargs):
nodemanager_random_cluster_nodes=False, timeout=20, **connection_kwargs):
self.timeout = timeout

super(ClusterBlockingConnectionPool, self).__init__(
Expand All @@ -434,6 +439,7 @@ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=No
reinitialize_steps=reinitialize_steps,
skip_full_coverage_check=skip_full_coverage_check,
nodemanager_follow_cluster=nodemanager_follow_cluster,
nodemanager_random_cluster_nodes=nodemanager_random_cluster_nodes,
**connection_kwargs
)

Expand Down Expand Up @@ -566,7 +572,8 @@ class ClusterReadOnlyConnectionPool(ClusterConnectionPool):
"""

def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=None,
max_connections=None, nodemanager_follow_cluster=False, **connection_kwargs):
max_connections=None, nodemanager_follow_cluster=False,
nodemanager_random_cluster_nodes=False, **connection_kwargs):
"""
"""
if connection_class is None:
Expand All @@ -578,6 +585,7 @@ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=No
max_connections=max_connections,
readonly=True,
nodemanager_follow_cluster=nodemanager_follow_cluster,
nodemanager_random_cluster_nodes=nodemanager_random_cluster_nodes,
**connection_kwargs
)

Expand Down
10 changes: 8 additions & 2 deletions rediscluster/nodemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class NodeManager(object):
"""
REDIS_CLUSTER_HASH_SLOTS = 16384

def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_coverage_check=False, nodemanager_follow_cluster=False,
def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_coverage_check=False,
nodemanager_follow_cluster=False, nodemanager_random_cluster_nodes=False,
host_port_remap=None, **connection_kwargs):
"""
:skip_full_coverage_check:
Expand All @@ -33,6 +34,9 @@ def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_covera
The node manager will during initialization try the last set of nodes that
it was operating on. This will allow the client to drift along side the cluster
if the cluster nodes move around alot.
:nodemanager_random_cluster_nodes:
The node manager will use disorder nodes during initialization, avoid all
cluster slots command are send to first node.
"""
log.debug("Creating new NodeManager instance")

Expand All @@ -45,6 +49,7 @@ def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_covera
self.reinitialize_steps = reinitialize_steps or 25
self._skip_full_coverage_check = skip_full_coverage_check
self.nodemanager_follow_cluster = nodemanager_follow_cluster
self.nodemanager_random_cluster_nodes = nodemanager_random_cluster_nodes
self.encoder = Encoder(
connection_kwargs.get('encoding', 'utf-8'),
connection_kwargs.get('encoding_errors', 'strict'),
Expand Down Expand Up @@ -219,7 +224,8 @@ def initialize(self):
# With this option the client will attempt to connect to any of the previous set of nodes instead of the original set of nodes
if self.nodemanager_follow_cluster:
nodes = self.startup_nodes

if self.nodemanager_random_cluster_nodes:
random.shuffle(nodes)
for node in nodes:
try:
r = self.get_redis_link(host=node["host"], port=node["port"], decode_responses=True)
Expand Down