diff --git a/.travis.yml b/.travis.yml index 14c52791..3a397203 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,22 +13,23 @@ install: - "if [[ $REDIS_VERSION == '3.0' ]]; then REDIS_VERSION=3.0 make redis-install; fi" - "if [[ $REDIS_VERSION == '3.2' ]]; then REDIS_VERSION=3.2 make redis-install; fi" - "if [[ $REDIS_VERSION == '4.0' ]]; then REDIS_VERSION=4.0 make redis-install; fi" + - "if [[ $REDIS_VERSION == '5.0' ]]; then REDIS_VERSION=5.0 make redis-install; fi" - pip install -r dev-requirements.txt - pip install -e . - "if [[ $HIREDIS == '1' ]]; then pip install hiredis; fi" env: - # Redis 3.0 + # Redis 3.0 & HIREDIS - HIREDIS=0 REDIS_VERSION=3.0 - # Redis 3.0 and HIREDIS - HIREDIS=1 REDIS_VERSION=3.0 - # Redis 3.2 + # Redis 3.2 & HIREDIS - HIREDIS=0 REDIS_VERSION=3.2 - # Redis 3.2 and HIREDIS - HIREDIS=1 REDIS_VERSION=3.2 - # Redis 4.0 + # Redis 4.0 & HIREDIS - HIREDIS=0 REDIS_VERSION=4.0 - # Redis 4.0 and HIREDIS - HIREDIS=1 REDIS_VERSION=4.0 + # Redis 5.0 & HIREDIS + - HIREDIS=0 REDIS_VERSION=5.0 + - HIREDIS=1 REDIS_VERSION=5.0 script: - make start - coverage erase diff --git a/Makefile b/Makefile index 1a5e1f0e..e0fbac7a 100644 --- a/Makefile +++ b/Makefile @@ -216,7 +216,7 @@ ifndef REDIS_TRIB_RB endif ifndef REDIS_VERSION - REDIS_VERSION=3.0.7 + REDIS_VERSION=4.0.10 endif export REDIS_CLUSTER_NODE1_CONF diff --git a/README.md b/README.md index d1def896..1d3b6f23 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ True ## License & Authors -Copyright (c) 2013-2017 Johan Andersson +Copyright (c) 2013-2018 Johan Andersson MIT (See docs/License.txt file) diff --git a/dev-requirements.txt b/dev-requirements.txt index 0d60c0e0..7f686527 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,9 +1,9 @@ -r requirements.txt coverage>=4.0,<5.0 -pytest>=2.8.3,<3.0.0 -testfixtures>=4.5.0,<5.0.0 -mock>=1.3.0,<2.0.0 +pytest>=2.8.3,<4.0.0 +testfixtures>=4.5.0,<5.5.0 +mock>=1.3.0,<2.1.0 docopt>=0.6.2,<1.0.0 tox>=2.2.0,<3.0.0 python-coveralls>=2.5.0,<3.0.0 diff --git a/docs/authors.rst b/docs/authors.rst index ee4cb8d4..e2464a2a 100644 --- a/docs/authors.rst +++ b/docs/authors.rst @@ -24,3 +24,6 @@ Authors who contributed code or testing: - AngusP - https://github.com/AngusP - Doug Kent - https://github.com/dkent - VascoVisser - https://github.com/VascoVisser + - astrohsy - https://github.com/astrohsy + - Artur Stawiarski - https://github.com/astawiarski + - Matthew Anderson - https://github.com/mc3ander diff --git a/docs/license.rst b/docs/license.rst index f3f84ca6..305c9087 100644 --- a/docs/license.rst +++ b/docs/license.rst @@ -1,7 +1,7 @@ Licensing --------- -Copyright (c) 2013-2016 Johan Andersson +Copyright (c) 2013-2018 Johan Andersson MIT (See docs/License.txt file) diff --git a/docs/pipelines.rst b/docs/pipelines.rst index 32c53067..d1120c4d 100644 --- a/docs/pipelines.rst +++ b/docs/pipelines.rst @@ -38,7 +38,7 @@ An `ASK` error means the slot is only partially migrated and that the client can The philosophy on pipelines --------------------------- -After playing around with pipelines and thinking about possible solutions that could be used in a cluster setting this document will describe how pipelines work, strengths and weaknesses with the implementation that was chosen. +After playing around with pipelines and thinking about possible solutions that could be used in a cluster setting this document will describe how pipelines work, strengths and weaknesses of the implementation that was chosen. Why can't we reuse the pipeline code in `redis-py`? In short it is almost the same reason why code from the normal redis client can't be reused in a cluster environment and that is because of the slots system. Redis cluster consist of a number of slots that is distributed across a number of servers and each key belongs in one of these slots. @@ -62,9 +62,16 @@ Consider the following example. Create a pipeline and issue 6 commands `A`, `B`, If we look back at the order we executed the commands we get `[A, F]` for the first node and `[B, E, C, D]` for the second node. At first glance this looks like it is out of order because command `E` is executed before `C` & `D`. Why is this not matter? Because no multi key operations can be done in a pipeline, we only have to care the execution order is correct for each slot and in this case it was because `B` & `E` belongs to the same slot and `C` & `D` belongs to the same slot. There should be no possible way to corrupt any data between slots if multi key commands are blocked by the code. -What is good with this pipeline solution? First we can actually have a pipeline solution that will work in most cases with few commands blocked (only multi key commands). Secondly we can run it in parralel to increase the performance of the pipeline even further, making the benefits even greater. +What is good with this pipeline solution? First we can actually have a pipeline solution that will work in most cases with few commands blocked (only multi key commands). Secondly we can run it in parallel to increase the performance of the pipeline even further, making the benefits even greater. +Packing Commands +---------------- + +When issuing only a single command, there is only one network round trip to be made. But what if you issue 100 pipelined commands? In a single-instance redis configuration, you still only need to make one network hop. The commands are packed into a single request and the server responds with all the data for those requests in a single response. But with redis cluster, those keys could be spread out over many different nodes. + +The client is responsible for figuring out which commands map to which nodes. Let's say for example that your 100 pipelined commands need to route to 3 different nodes? The first thing the client does is break out the commands that go to each node, so it only has 3 network requests to make instead of 100. + Transactions and WATCH ---------------------- @@ -135,7 +142,7 @@ This section will describe different types of pipeline solutions. It will list t Suggestion one ************** -Simple but yet sequential pipeline. This solution acts more like an interface for the already existing pipeline implementation and only provides a simple backwards compatible interface to ensure that code that sexists still will work withouth any major modifications. The good this with this implementation is that because all commands is runned in sequence it will handle `MOVED` or `ASK` redirections very good and withouth any problems. The major downside to this solution is that no commands is ever batched and runned in parralell and thus you do not get any major performance boost from this approach. Other plus is that execution order is preserved across the entire cluster but a major downside is that thte commands is no longer atomic on the cluster scale because they are sent in multiple commands to different nodes. +Simple but yet sequential pipeline. This solution acts more like an interface for the already existing pipeline implementation and only provides a simple backwards compatible interface to ensure that code that sexists still will work withouth any major modifications. The good this with this implementation is that because all commands is runned in sequence it will handle `MOVED` or `ASK` redirections very good and withouth any problems. The major downside to this solution is that no command is ever batched and ran in parallel and thus you do not get any major performance boost from this approach. Other plus is that execution order is preserved across the entire cluster but a major downside is that thte commands is no longer atomic on the cluster scale because they are sent in multiple commands to different nodes. **Good** @@ -151,8 +158,8 @@ Simple but yet sequential pipeline. This solution acts more like an interface fo Suggestion two ************** -Current pipeline implementation. This implementation is rather good and works well because it combines the existing pipeline interface and functionality and it also provides a basic handling of `ASK` or `MOVED` errors inside the client. One major downside to this is that execution order is not preserved across the cluster. Altho the execution order is somewhat broken if you look at the entire cluster level becuase commands can be splitted so that cmd1, cmd3, cmd5 get sent to one server and cmd2, cmd4 gets sent to another server. The order is then broken globally but locally for each server it is preserved and maintained correctly. On the other hand i guess that there can't be any commands that can affect different hashslots within the same command so it maybe do not really matter if the execution order is not correct because for each slot/key the order is valid. -There might be some issues with rebuilding the correct response ordering from the scattered data because each command might be in different sub pipelines. But i think that our current code still handles this correctly. I think i have to figure out some wierd case where the execution order acctually matters. There might be some issues with the nonsupported mget/mset commands that acctually performs different sub commands then it currently supports. +Current pipeline implementation. This implementation is rather good and works well because it combines the existing pipeline interface and functionality and it also provides a basic handling of `ASK` or `MOVED` errors inside the client. One major downside to this is that execution order is not preserved across the cluster. Although the execution order is somewhat broken if you look at the entire cluster level because commands can be split so that cmd1, cmd3, cmd5 get sent to one server and cmd2, cmd4 gets sent to another server. The order is then broken globally but locally for each server it is preserved and maintained correctly. On the other hand I guess that there can't be any commands that can affect different hashslots within the same command so maybe it really doesn't matter if the execution order is not correct because for each slot/key the order is valid. +There might be some issues with rebuilding the correct response ordering from the scattered data because each command might be in different sub pipelines. But I think that our current code still handles this correctly. I think I have to figure out some weird case where the execution order actually matters. There might be some issues with the nonsupported mget/mset commands that acctually performs different sub commands then it currently supports. **Good** @@ -160,7 +167,7 @@ There might be some issues with rebuilding the correct response ordering from th **Bad** - - Not sequential execution on the entire pipeline + - Non sequential execution on the entire pipeline - Medium difficult `ASK` or `MOVED` handling @@ -168,7 +175,7 @@ There might be some issues with rebuilding the correct response ordering from th Suggestion three **************** -There is a even simpler form of pipelines that can be made where all commands is supported as long as they conform to the same hashslot because redis supports that mode of operation. The good thing with this is that sinc all keys must belong to the same slot there can't be very few `ASK` or `MOVED` errors that happens and if they happen they will be very easy to handle because the entire pipeline is kinda atomic because you talk to the same server and only 1 server. There can't be any multiple server communication happening. +There is a even simpler form of pipelines that can be made where all commands is supported as long as they conform to the same hashslot because REDIS supports that mode of operation. The good thing with this is that since all keys must belong to the same slot there can't be very few `ASK` or `MOVED` errors that happens and if they happen they will be very easy to handle because the entire pipeline is kinda atomic because you talk to the same server and only 1 server. There can't be any multiple server communication happening. **Good** @@ -184,7 +191,7 @@ There is a even simpler form of pipelines that can be made where all commands is Suggestion four ************** -One other solution is the 2 step commit solution where you send for each server 2 batches of commands. The first command should somehow establish that each keyslot is in the correct state and able to handle the data. After the client have recieved OK from all nodes that all data slots is good to use then it will acctually send the real pipeline with all data and commands. The big problem with this approach is that ther eis a gap between the checking of the slots and the acctual sending of the data where things can happen to the already established slots setup. But at the same time there is no possibility of merging these 2 steps because if step 2 is automatically runned if step 1 is Ok then the pipeline for the first node that will fail will fail but for the other nodes it will suceed but when it should not because if one command gets `ASK` or `MOVED` redirection then all pipeline objects must be rebuilt to match the new specs/setup and then reissued by the client. The major advantage of this solution is that if you have total controll of the redis server and do controlled upgrades when no clients is talking to the server then it can acctually work really well because there is no possibility that `ASK` or `MOVED` will triggered by migrations in between the 2 batches. +One other solution is the 2 step commit solution where you send for each server 2 batches of commands. The first command should somehow establish that each keyslot is in the correct state and able to handle the data. After the client have recieved OK from all nodes that all data slots is good to use then it will acctually send the real pipeline with all data and commands. The big problem with this approach is that ther eis a gap between the checking of the slots and the acctual sending of the data where things can happen to the already established slots setup. But at the same time there is no possibility of merging these 2 steps because if step 2 is automatically runned if step 1 is Ok then the pipeline for the first node that will fail will fail but for the other nodes it will suceed but when it should not because if one command gets `ASK` or `MOVED` redirection then all pipeline objects must be rebuilt to match the new specs/setup and then reissued by the client. The major advantage of this solution is that if you have total controll of the redis server and do controlled upgrades when no clients is talking to the server then it can actually work really well because there is no possibility that `ASK` or `MOVED` will triggered by migrations in between the 2 batches. **Good** diff --git a/docs/release-notes.rst b/docs/release-notes.rst index c7d56f35..6d40994b 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -1,6 +1,26 @@ Release Notes ============= +1.3.5 (July 22, 2018) +-------------- + + * Add Redis 4 compatability fix to CLUSTER NODES command (See issue #217) + * Fixed bug with command "CLUSTER GETKEYSINSLOT" that was throwing exceptions + * Added new methods cluster_get_keys_in_slot() to client + * Fixed bug with `StrictRedisCluster.from_url` that was ignoring the `readonly_mode` parameter + * NodeManager will now ignore nodes showing cluster errors when initializing the cluster + * Fix bug where RedisCluster wouldn't refresh the cluster table when executing commands on specific nodes + * Add redis 5.0 to travis-ci tests + * Change default redis version from 3.0.7 to 4.0.10 + * Increase accepted ranges of dependencies specefied in dev-requirements.txt + * Several major and minor documentation updates and tweaks + * Add example script "from_url_password_protected.py" + * command "CLUSTER GETKEYSINSLOT" is now returned as a list and not int + * Improve support for ssl connections + * Retry on Timeout errors when doing cluster discovery + * Added new error class "MasterDownError" + * Updated requirements for dependency of redis-py to latest version + 1.3.4 (Mar 5, 2017) ------------------- @@ -79,7 +99,7 @@ Release Notes * Implement all "CLUSTER ..." commands as methods in the client class * Client now follows the service side setting 'cluster-require-full-coverage=yes/no' (baranbartu) * Change the pubsub implementation (PUBLISH/SUBSCRIBE commands) from using one single node to now determine the hashslot for the channel name and use that to connect to - a node in the cluster. Other clients that do not use this pattern will not be fully compatible with this client. Known limitations is pattern + a node in the cluster. Other clients that do not use this pattern will not be fully compatible with this client. Known limitations is pattern subscription that do not work properly because a pattern can't know all the possible channel names in advance. * Convert all docs to ReadTheDocs * Rework connection pool logic to be more similar to redis-py. This also fixes an issue with pubsub and that connections diff --git a/docs/threads.rst b/docs/threads.rst index c5af63dc..d6f2d869 100644 --- a/docs/threads.rst +++ b/docs/threads.rst @@ -13,15 +13,6 @@ The advantage to this design is that a smart client can communicate with the clu -Packing Commands ----------------- - -When issuing only a single command, there is only one network round trip to be made. But what if you issue 100 pipelined commands? In a single-instance redis configuration, you still only need to make one network hop. The commands are packed into a single request and the server responds with all the data for those requests in a single response. But with redis cluster, those keys could be spread out over many different nodes. - -The client is responsible for figuring out which commands map to which nodes. Let's say for example that your 100 pipelined commands need to route to 3 different nodes? The first thing the client does is break out the commands that go to each node, so it only has 3 network requests to make instead of 100. - - - Parallel network i/o using threads ---------------------------------- diff --git a/docs/upgrading.rst b/docs/upgrading.rst index 70096e29..66d20c7d 100644 --- a/docs/upgrading.rst +++ b/docs/upgrading.rst @@ -3,11 +3,16 @@ Upgrading redis-py-cluster This document describes what must be done when upgrading between different versions to ensure that code still works. +1.3.2 --> Next Release +---------------------- + +If you created the `StrictRedisCluster` (or `RedisCluster`) instance via the `from_url` method and were passing `readonly_mode` to it, the connection pool created will now properly allow selecting read-only slaves from the pool. Previously it always used master nodes only, even in the case of `readonly_mode=True`. Make sure your code don't attempt any write commands over connections with `readonly_mode=True`. + 1.3.1 --> 1.3.2 --------------- -If your redis instance is configured to not have the `CONFIG ...` comannds enabled due to security reasons you need to pass this into the client object `skip_full_coverage_check=True`. Benefits is that the client class no longer requires the `CONFIG ...` commands to be enabled on the server. Downsides is that you can't use the option in your redis server and still use the same feature in this client. +If your redis instance is configured to not have the `CONFIG ...` commands enabled due to security reasons you need to pass this into the client object `skip_full_coverage_check=True`. Benefits is that the client class no longer requires the `CONFIG ...` commands to be enabled on the server. Downsides is that you can't use the option in your redis server and still use the same feature in this client. diff --git a/examples/from_url_password_protected.py b/examples/from_url_password_protected.py new file mode 100644 index 00000000..e908b024 --- /dev/null +++ b/examples/from_url_password_protected.py @@ -0,0 +1,9 @@ +from rediscluster import StrictRedisCluster + +url="redis://:R1NFTBWTE1@10.127.91.90:6572/0" + +rc = StrictRedisCluster.from_url(url, skip_full_coverage_check=True) + +rc.set("foo", "bar") + +print(rc.get("foo")) diff --git a/examples/pipeline-incrby.py b/examples/pipeline-incrby.py index f6478b2f..a2fc4afe 100644 --- a/examples/pipeline-incrby.py +++ b/examples/pipeline-incrby.py @@ -12,10 +12,10 @@ pipe.execute() pipe = r.pipeline(transaction=False) - pipe.set("foo-{0}".format(d, d)) - pipe.incrby("foo-{0}".format(d, 1)) - pipe.set("bar-{0}".format(d, d)) - pipe.incrby("bar-{0}".format(d, 1)) - pipe.set("bazz-{0}".format(d, d)) - pipe.incrby("bazz-{0}".format(d, 1)) + pipe.set("foo-{0}".format(d), d) + pipe.incrby("foo-{0}".format(d), 1) + pipe.set("bar-{0}".format(d), d) + pipe.incrby("bar-{0}".format(d), 1) + pipe.set("bazz-{0}".format(d), d) + pipe.incrby("bazz-{0}".format(d), 1) pipe.execute() diff --git a/rediscluster/__init__.py b/rediscluster/__init__.py index c7d0a6d3..7bf3c76e 100644 --- a/rediscluster/__init__.py +++ b/rediscluster/__init__.py @@ -16,7 +16,7 @@ setattr(redis, "StrictClusterPipeline", StrictClusterPipeline) # Major, Minor, Fix version -__version__ = (1, 3, 4) +__version__ = (1, 3, 5) if sys.version_info[0:3] == (3, 4, 0): raise RuntimeError("CRITICAL: rediscluster do not work with python 3.4.0. Please use 3.4.1 or higher.") diff --git a/rediscluster/client.py b/rediscluster/client.py index 65122ec4..773c1cfd 100644 --- a/rediscluster/client.py +++ b/rediscluster/client.py @@ -7,7 +7,10 @@ import time # rediscluster imports -from .connection import ClusterConnectionPool, ClusterReadOnlyConnectionPool +from .connection import ( + ClusterConnectionPool, ClusterReadOnlyConnectionPool, + SSLClusterConnection, +) from .exceptions import ( RedisClusterException, AskError, MovedError, ClusterDownError, ClusterError, TryAgainError, @@ -64,6 +67,7 @@ class StrictRedisCluster(StrictRedis): ], 'random'), string_keys_to_dict([ "CLUSTER COUNTKEYSINSLOT", + "CLUSTER GETKEYSINSLOT", ], 'slot-id'), ) @@ -108,7 +112,7 @@ class StrictRedisCluster(StrictRedis): 'CLUSTER DELSLOTS': bool_ok, 'CLUSTER FAILOVER': bool_ok, 'CLUSTER FORGET': bool_ok, - 'CLUSTER GETKEYSINSLOT': int, + 'CLUSTER GETKEYSINSLOT': list, 'CLUSTER INFO': parse_info, 'CLUSTER KEYSLOT': int, 'CLUSTER MEET': bool_ok, @@ -125,8 +129,9 @@ class StrictRedisCluster(StrictRedis): 'READWRITE': bool_ok, } - def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32, max_connections_per_node=False, init_slot_cache=True, - readonly_mode=False, reinitialize_steps=None, skip_full_coverage_check=False, nodemanager_follow_cluster=False, **kwargs): + def __init__(self, host=None, port=None, startup_nodes=None, max_connections=None, max_connections_per_node=False, init_slot_cache=True, + readonly_mode=False, reinitialize_steps=None, skip_full_coverage_check=False, nodemanager_follow_cluster=False, + connection_class=None, **kwargs): """ :startup_nodes: List of nodes that initial bootstrapping can be done from @@ -156,6 +161,9 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32, if "db" in kwargs: raise RedisClusterException("Argument 'db' is not possible to use in cluster mode") + if kwargs.get('ssl', False): + connection_class = SSLClusterConnection + if "connection_pool" in kwargs: pool = kwargs.pop('connection_pool') else: @@ -178,6 +186,7 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32, max_connections_per_node=max_connections_per_node, skip_full_coverage_check=skip_full_coverage_check, nodemanager_follow_cluster=nodemanager_follow_cluster, + connection_class=connection_class, **kwargs ) @@ -190,7 +199,7 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32, self.response_callbacks = dict_merge(self.response_callbacks, self.CLUSTER_COMMANDS_RESPONSE_CALLBACKS) @classmethod - def from_url(cls, url, db=None, skip_full_coverage_check=False, **kwargs): + def from_url(cls, url, db=None, skip_full_coverage_check=False, readonly_mode=False, **kwargs): """ Return a Redis client object configured from the given URL, which must use either `the ``redis://`` scheme @@ -210,7 +219,12 @@ def from_url(cls, url, db=None, skip_full_coverage_check=False, **kwargs): passed along to the ConnectionPool class's initializer. In the case of conflicting arguments, querystring arguments always win. """ - connection_pool = ClusterConnectionPool.from_url(url, db=db, **kwargs) + if readonly_mode: + connection_pool_cls = ClusterReadOnlyConnectionPool + else: + connection_pool_cls = ClusterConnectionPool + + connection_pool = connection_pool_cls.from_url(url, db=db, skip_full_coverage_check=skip_full_coverage_check, **kwargs) return cls(connection_pool=connection_pool, skip_full_coverage_check=skip_full_coverage_check) def __repr__(self): @@ -314,15 +328,15 @@ def execute_command(self, *args, **kwargs): command = args[0] - node = self.determine_node(*args, **kwargs) - if node: - return self._execute_command_on_nodes(node, *args, **kwargs) - # If set externally we must update it before calling any commands if self.refresh_table_asap: self.connection_pool.nodes.initialize() self.refresh_table_asap = False + node = self.determine_node(*args, **kwargs) + if node: + return self._execute_command_on_nodes(node, *args, **kwargs) + redirect_addr = None asking = False @@ -409,6 +423,12 @@ def _execute_command_on_nodes(self, nodes, *args, **kwargs): connection.send_command(*args) res[node["name"]] = self.parse_response(connection, command, **kwargs) + except ClusterDownError as e: + self.connection_pool.disconnect() + self.connection_pool.reset() + self.refresh_table_asap = True + + raise finally: self.connection_pool.release(connection) @@ -556,6 +576,12 @@ def cluster_save_config(self): """ return self.execute_command('CLUSTER SAVECONFIG') + def cluster_get_keys_in_slot(self, slot, num_keys): + """ + Returns the number of keys in the specefied cluster slot + """ + return self.execute_command('CLUSTER GETKEYSINSLOT', slot, num_keys) + def cluster_set_config_epoch(self, node_id, epoch): """ Set the configuration epoch in a new node diff --git a/rediscluster/connection.py b/rediscluster/connection.py index 9f2b707c..4a773486 100644 --- a/rediscluster/connection.py +++ b/rediscluster/connection.py @@ -12,6 +12,7 @@ from .exceptions import ( RedisClusterException, AskError, MovedError, TryAgainError, ClusterDownError, ClusterCrossSlotError, + MasterDownError, ) # 3rd party imports @@ -31,6 +32,7 @@ class ClusterParser(DefaultParser): 'MOVED': MovedError, 'CLUSTERDOWN': ClusterDownError, 'CROSSSLOT': ClusterCrossSlotError, + 'MASTERDOWN': MasterDownError, }) @@ -98,7 +100,7 @@ class ClusterConnectionPool(ConnectionPool): """ RedisClusterDefaultTimeout = None - def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=ClusterConnection, + def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=None, max_connections=None, max_connections_per_node=False, reinitialize_steps=None, skip_full_coverage_check=False, nodemanager_follow_cluster=False, **connection_kwargs): """ @@ -110,6 +112,8 @@ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=Cl it was operating on. This will allow the client to drift along side the cluster if the cluster nodes move around alot. """ + if connection_class is None: + connection_class = ClusterConnection super(ClusterConnectionPool, self).__init__(connection_class=connection_class, max_connections=max_connections) # Special case to make from_url method compliant with cluster setting. @@ -287,7 +291,7 @@ def get_random_connection(self): raise Exception("Cant reach a single startup node.") - def get_connection_by_key(self, key): + def get_connection_by_key(self, key, command): """ """ if not key: @@ -339,10 +343,12 @@ class ClusterReadOnlyConnectionPool(ClusterConnectionPool): Readonly connection pool for rediscluster """ - def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=ClusterConnection, + def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=None, max_connections=None, nodemanager_follow_cluster=False, **connection_kwargs): """ """ + if connection_class is None: + connection_class = ClusterConnection super(ClusterReadOnlyConnectionPool, self).__init__( startup_nodes=startup_nodes, init_slot_cache=init_slot_cache, @@ -352,8 +358,43 @@ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=Cl nodemanager_follow_cluster=nodemanager_follow_cluster, **connection_kwargs) - def get_node_by_slot(self, slot): + self.master_node_commands = ('SCAN', 'SSCAN', 'HSCAN', 'ZSCAN') + + def get_connection_by_key(self, key, command): + """ + """ + if not key: + raise RedisClusterException("No way to dispatch this command to Redis Cluster.") + + if command in self.master_node_commands: + return self.get_master_connection_by_slot(self.nodes.keyslot(key)) + else: + return self.get_random_master_slave_connection_by_slot(self.nodes.keyslot(key)) + + def get_master_connection_by_slot(self, slot): + """ + Returns a connection for the Master node for the specefied slot. + + Do not return a random node if master node is not available for any reason. + """ + self._checkpid() + return self.get_connection_by_node(self.get_node_by_slot(slot)) + + def get_random_master_slave_connection_by_slot(self, slot): + """ + Returns a random connection from the set of (master + slaves) for the + specefied slot. If connection is not reachable then return a random connection. + """ + self._checkpid() + + try: + return self.get_node_by_slot_random(self.get_node_by_slot(slot)) + except KeyError: + return self.get_random_connection() + + def get_node_by_slot_random(self, slot): """ + Return a random node for the specified slot. """ return random.choice(self.nodes.slots[slot]) diff --git a/rediscluster/exceptions.py b/rediscluster/exceptions.py index 5019c98e..ea4a60a0 100644 --- a/rediscluster/exceptions.py +++ b/rediscluster/exceptions.py @@ -74,3 +74,8 @@ class MovedError(AskError): """ """ pass + +class MasterDownError(ClusterDownError): + """ + """ + pass diff --git a/rediscluster/nodemanager.py b/rediscluster/nodemanager.py index d97c35d2..b16877d1 100644 --- a/rediscluster/nodemanager.py +++ b/rediscluster/nodemanager.py @@ -10,7 +10,7 @@ # 3rd party imports from redis import StrictRedis from redis._compat import b, unicode, bytes, long, basestring -from redis import ConnectionError +from redis import ConnectionError, TimeoutError, ResponseError class NodeManager(object): @@ -154,10 +154,6 @@ def get_redis_link(self, host, port, decode_responses=False): def initialize(self): """ Init the slots cache by asking all startup nodes what the current cluster configuration is - - TODO: Currently the last node will have the last say about how the configuration is setup. - Maybe it should stop to try after it have correctly covered all slots or when one node is reached - and it could execute CLUSTER SLOTS command. """ nodes_cache = {} tmp_slots = {} @@ -177,8 +173,14 @@ def initialize(self): r = self.get_redis_link(host=node["host"], port=node["port"], decode_responses=True) cluster_slots = r.execute_command("cluster", "slots") startup_nodes_reachable = True - except ConnectionError: + except (ConnectionError, TimeoutError): continue + except ResponseError as e: + # Isn't a cluster connection, so it won't parse these exceptions automatically + if 'CLUSTERDOWN' in e.message or 'MASTERDOWN' in e.message: + continue + else: + raise RedisClusterException("ERROR sending 'cluster slots' command to redis server: {0}".format(node)) except Exception: raise RedisClusterException("ERROR sending 'cluster slots' command to redis server: {0}".format(node)) @@ -263,8 +265,13 @@ def cluster_require_full_coverage(self, nodes_cache): nodes = nodes_cache or self.nodes def node_require_full_coverage(node): - r_node = self.get_redis_link(host=node["host"], port=node["port"], decode_responses=True) - return "yes" in r_node.config_get("cluster-require-full-coverage").values() + try: + r_node = self.get_redis_link(host=node["host"], port=node["port"], decode_responses=True) + return "yes" in r_node.config_get("cluster-require-full-coverage").values() + except ConnectionError: + return False + except Exception: + raise RedisClusterException("ERROR sending 'config get cluster-require-full-coverage' command to redis server: {0}".format(node)) # at least one node should have cluster-require-full-coverage yes return any(node_require_full_coverage(node) for node in nodes.values()) diff --git a/rediscluster/pipeline.py b/rediscluster/pipeline.py index 4fe4fb88..9b1d0283 100644 --- a/rediscluster/pipeline.py +++ b/rediscluster/pipeline.py @@ -430,6 +430,6 @@ def read(self): except (ConnectionError, TimeoutError) as e: for c in self.commands: c.result = e - return + return except RedisError: c.result = sys.exc_info()[1] diff --git a/rediscluster/utils.py b/rediscluster/utils.py index ad12fd2f..ce038d93 100644 --- a/rediscluster/utils.py +++ b/rediscluster/utils.py @@ -182,12 +182,14 @@ def parse_slots(s): self_id, addr, flags, master_id, ping_sent, \ pong_recv, config_epoch, link_state = parts[:8] - host, port = addr.rsplit(':', 1) + host, ports = addr.rsplit(':', 1) + port, _, cluster_port = ports.partition('@') node = { 'id': self_id, 'host': host or current_host, 'port': int(port), + 'cluster-bus-port': int(cluster_port) if cluster_port else 10000 + int(port), 'flags': tuple(flags.split(',')), 'master': master_id if master_id != '-' else None, 'ping-sent': int(ping_sent), diff --git a/requirements.txt b/requirements.txt index a4d27eb0..345a0b84 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -redis>=2.10.2 +redis>=2.10.6 diff --git a/setup.py b/setup.py index f1bba3fb..22ee03a9 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ setup( name="redis-py-cluster", - version="1.3.4", + version="1.3.5", description="Cluster library for redis 3.0.0 built on top of redis-py lib", long_description=readme + '\n\n' + history, author="Johan Andersson", @@ -31,13 +31,13 @@ url='http://github.com/grokzen/redis-py-cluster', license='MIT', install_requires=[ - 'redis>=2.10.2' + 'redis>=2.10.6' ], keywords=[ 'redis', 'redis cluster', ], - classifiers=( + classifiers=[ # As from https://pypi.python.org/pypi?%3Aaction=list_classifiers # 'Development Status :: 1 - Planning', # 'Development Status :: 2 - Pre-Alpha', @@ -53,8 +53,9 @@ 'Programming Language :: Python :: 3.3', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', 'Environment :: Web Environment', 'Operating System :: POSIX', 'License :: OSI Approved :: MIT License', - ) + ] ) diff --git a/tests/test_cluster_connection_pool.py b/tests/test_cluster_connection_pool.py index bb69bb01..45efe007 100644 --- a/tests/test_cluster_connection_pool.py +++ b/tests/test_cluster_connection_pool.py @@ -131,11 +131,11 @@ def side_effect(self, *args, **kwargs): return DummyConnection(port=1337) pool_mock.side_effect = side_effect - connection = pool.get_connection_by_key("foo") + connection = pool.get_connection_by_key("foo", 'GET') assert connection.port == 1337 with pytest.raises(RedisClusterException) as ex: - pool.get_connection_by_key(None) + pool.get_connection_by_key(None, None) assert unicode(ex.value).startswith("No way to dispatch this command to Redis Cluster."), True def test_get_connection_by_slot(self): @@ -209,7 +209,35 @@ def test_max_connections(self): with pytest.raises(RedisClusterException): pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) - def test_get_node_by_slot(self): + def test_get_connection_by_slot(self): + """ + """ + pool = self.get_pool(connection_kwargs={}) + + # Patch the call that is made inside the method to allow control of the returned connection object + with patch.object(ClusterReadOnlyConnectionPool, 'get_master_connection_by_slot', autospec=True) as pool_mock: + def side_effect(self, *args, **kwargs): + return DummyConnection(port=1337) + pool_mock.side_effect = side_effect + + # Try a master only command + connection = pool.get_connection_by_key("foo", 'ZSCAN') + assert connection.port == 1337 + + with patch.object(ClusterReadOnlyConnectionPool, 'get_random_master_slave_connection_by_slot', autospec=True) as pool_mock: + def side_effect(self, *args, **kwargs): + return DummyConnection(port=1337) + pool_mock.side_effect = side_effect + + # try a random node command + connection = pool.get_connection_by_key('foo', 'GET') + assert connection.port == 1337 + + with pytest.raises(RedisClusterException) as ex: + pool.get_connection_by_key(None, None) + assert unicode(ex.value).startswith("No way to dispatch this command to Redis Cluster."), True + + def test_get_node_by_slot_random(self): """ We can randomly get all nodes in readonly mode. """ @@ -218,7 +246,7 @@ def test_get_node_by_slot(self): expected_ports = {7000, 7003} actual_ports = set() for _ in range(0, 100): - node = pool.get_node_by_slot(0) + node = pool.get_node_by_slot_random(0) actual_ports.add(node['port']) assert actual_ports == expected_ports diff --git a/tests/test_cluster_obj.py b/tests/test_cluster_obj.py index c9b08084..2e715f25 100644 --- a/tests/test_cluster_obj.py +++ b/tests/test_cluster_obj.py @@ -446,3 +446,68 @@ def test_access_correct_slave_with_readonly_mode_client(sr): readonly_client = StrictRedisCluster(host="127.0.0.1", port=7000, readonly_mode=True) assert b('foo') == readonly_client.get('foo16706') assert return_master_mock.call_count == 0 + + readonly_client = StrictRedisCluster.from_url(url="redis://127.0.0.1:7000/0", readonly_mode=True) + assert b('foo') == readonly_client.get('foo16706') + assert return_master_mock.call_count == 0 + + +def test_refresh_using_specific_nodes(r): + """ + Test making calls on specific nodes when the cluster has failed over to + another node + """ + with patch.object(StrictRedisCluster, 'parse_response') as parse_response_mock: + with patch.object(NodeManager, 'initialize', autospec=True) as init_mock: + # simulate 7006 as a failed node + def side_effect(self, *args, **kwargs): + if self.port == 7006: + parse_response_mock.failed_calls += 1 + raise ClusterDownError('CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information') + elif self.port == 7007: + parse_response_mock.successful_calls += 1 + + def side_effect_rebuild_slots_cache(self): + # start with all slots mapped to 7006 + self.nodes = {'127.0.0.1:7006': {'host': '127.0.0.1', 'server_type': 'master', 'port': 7006, 'name': '127.0.0.1:7006'}} + self.slots = {} + + for i in range(0, 16383): + self.slots[i] = [{ + 'host': '127.0.0.1', + 'server_type': 'master', + 'port': 7006, + 'name': '127.0.0.1:7006', + }] + + # After the first connection fails, a reinitialize should follow the cluster to 7007 + def map_7007(self): + self.nodes = {'127.0.0.1:7007': {'host': '127.0.0.1', 'server_type': 'master', 'port': 7007, 'name': '127.0.0.1:7007'}} + self.slots = {} + + for i in range(0, 16383): + self.slots[i] = [{ + 'host': '127.0.0.1', + 'server_type': 'master', + 'port': 7007, + 'name': '127.0.0.1:7007', + }] + init_mock.side_effect = map_7007 + + parse_response_mock.side_effect = side_effect + parse_response_mock.successful_calls = 0 + parse_response_mock.failed_calls = 0 + + init_mock.side_effect = side_effect_rebuild_slots_cache + + rc = StrictRedisCluster(host='127.0.0.1', port=7006) + assert len(rc.connection_pool.nodes.nodes) == 1 + assert '127.0.0.1:7006' in rc.connection_pool.nodes.nodes + + rc.ping() + + # Cluster should now point to 7006, and there should be one failed and one succesful call + assert len(rc.connection_pool.nodes.nodes) == 1 + assert '127.0.0.1:7007' in rc.connection_pool.nodes.nodes + assert parse_response_mock.failed_calls == 1 + assert parse_response_mock.successful_calls == 1 diff --git a/tests/test_scripting.py b/tests/test_scripting.py index cc1107eb..1dd14d50 100644 --- a/tests/test_scripting.py +++ b/tests/test_scripting.py @@ -94,7 +94,6 @@ def test_script_loading(self, r): def test_script_object(self, r): r.set('a', 2) multiply = r.register_script(multiply_script) - assert not multiply.sha # test evalsha fail -> script load + retry assert multiply(keys=['a'], args=[3]) == 6 assert multiply.sha diff --git a/tox.ini b/tox.ini index 0ad14160..119860ae 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # install tox" and then run "tox" from this directory. [tox] -envlist = py27, py33, py34, py35, hi27, hi33, hi34, hi35, flake8-py34, flake8-py27 +envlist = py27, py33, py34, py35, py36, hi27, hi33, hi34, hi35, flake8-py34, flake8-py27 [testenv] deps = -r{toxinidir}/dev-requirements.txt