-
Notifications
You must be signed in to change notification settings - Fork 315
Reddit patched: Pipeline optimizations #406
base: master
Are you sure you want to change the base?
Conversation
6358cf3
to
5acdb82
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments from a initial pass. There is some stuff to discuss around gevent at least, some changes is required to alter back, and some we can talk about/around.
On a side note tho, after diffing into the pipeline code a bit again, i remember back to how complex that piece of code is compared to how i would want it to be way more simpler. I am hoping that in 3.0.0 i can rewrite the node and connection handling good enough to make the pipeline code way simpler compared to today. My dream would be to handle only the commands in the pipeline and just route them to correct node and let the normal client and connection classes handle all the errors and other scenarios that can occur. Will see how that goes down the line tho.
rediscluster/client.py
Outdated
@@ -172,7 +172,7 @@ class RedisCluster(Redis): | |||
|
|||
# Not complete, but covers the major ones | |||
# https://redis.io/commands | |||
READ_COMMANDS = [ | |||
READ_COMMANDS = set([ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A set is bad here as manipulating it as a user should technically be possible to manipulate that list to add in things that might not be supported or in the main code in a release yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, a frozenset
would be better here.
rediscluster/nodemanager.py
Outdated
@@ -257,6 +258,8 @@ def initialize(self): | |||
node, node_name = self.make_node_obj(master_node[0], master_node[1], 'master') | |||
nodes_cache[node_name] = node | |||
|
|||
self.slave_nodes_by_master[node_name] = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set? i try to avoid them if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking to lookup the slaves by slave names, it turns out I don't need it eventually. So I think list is perfectly fine here. I will change it.
rediscluster/nodemanager.py
Outdated
@@ -267,6 +270,7 @@ def initialize(self): | |||
target_slave_node, slave_node_name = self.make_node_obj(slave_node[0], slave_node[1], 'slave') | |||
nodes_cache[slave_node_name] = target_slave_node | |||
tmp_slots[i].append(target_slave_node) | |||
self.slave_nodes_by_master[node_name].add(slave_node_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am i guessing right that we need this as a convenience tracking data structure to determine what slaves is for each master as we do not directly has that right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, it's for the moved error handling. We should copy the master and the corresponding replicas to the slot.
@@ -15,6 +16,10 @@ | |||
from redis.exceptions import ConnectionError, RedisError, TimeoutError | |||
from redis._compat import imap, unicode | |||
|
|||
from gevent import monkey; monkey.patch_all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a super fan of this addition tho. I think we had a gevent implementation and a threaded implementation way back but we replaced it with the current version that we have in the main execution code path as it was a major overhead to use any kind of threading or parallelization librar/framework to achive some kind of speed improvements which we never saw that we actually gained unless you ran pipelines against a ton of master nodes, not number of commands but just a big number of nodes.
Basically you will have to defend this new implementation why we should add this one over the existing one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be more efficient if we need to talk to multiple shards/nodes in the pipeline. And for now, it's only used for pipeline commands which should have no impact/overhead on the rest of the logic.
rediscluster/pipeline.py
Outdated
# if we have to run through it again, we only retry the commands that failed. | ||
cmds = sorted(stack, key=lambda x: x.position) | ||
|
||
max_redirects = 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Number of redirections should be configurable in as a class variable and not hard coded here. Also the main execution loop has 16 TTL rounds before failing out. This has 5 and that is possibly a discrepancy, yes if you get that many moved/ask errors then something is probably more broken then how this code handles the case :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's an initial PR which I just list out all the things. This would be read from the config for sure.
rediscluster/pipeline.py
Outdated
# build a list of node objects based on node names we need to | ||
nodes, connection_by_node = self._get_commands_by_node(cmds) | ||
|
||
# send the commands in sequence. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Old docs that is not relevant with your new solution?
rediscluster/pipeline.py
Outdated
# so that we can read them all in parallel as they come back. | ||
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference. | ||
|
||
# duke-cliff: I think it would still be faster if we use gevent to make the command in parallel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a comment here in the PR, not inside the code with speculation that it might be faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove it.
rediscluster/pipeline.py
Outdated
node_commands = nodes.values() | ||
events = [] | ||
for n in node_commands: | ||
events.append(gevent.spawn(self._execute_node_commands, n)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrapping gevent around this is not really nessesary? You are still doing the same write()
+ read()
command inside the function anyway. Please show by testing and instrumentation that this is indeed faster and by how much to actually justify adding in two new dependencies for this small use-case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using non-blocking, it only means the thread would not be blocked by the IO, but you still need CPU time to serialize/deserialize data to/from the socket.
So in this case, if you call redis.packed_commands(....) for multiple NodeCommands
these would be queued in the same thread/process. And with gevent, this would become faster if it runs on multiple cores.
I said this part is nice to have, if you don't want it in the public version, we can talk about whether to give an option to the user or remove it.
nodes = {} | ||
proxy_node_by_master = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont really get what this helps with from the docs lines you written about this variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proxy_node will be the node to execute the NodeCommands.
My previous PR will just find a random node by get_node_by_slot(slot, True). But this would cause a lot of commands in the same shard would pick different replicas, thus they are not able to be packed efficiently.
So the new logic is:
- determine the shard of the slot, the shard is the master_node_name
- find a proxy node for the shard: which is
proxy_node_by_master
- group all commands in the same shard to the proxy_node
rediscluster/pipeline.py
Outdated
cur_attempt = 0 | ||
|
||
while cur_attempt < max_redirects: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor lint issue, no blank line after a while statement
…hould use replicas based on the command
@Grokzen I just updated this PR with these changes:
|
@duke-cliff I will take these changes out for a test spin and come back to you. It looks good now :) |
Hi, I'm curious about the status of this PR, I think we could definitely benefit from some of these changes so we'd love to see it (or some subset of it) merged. |
I have added the following optimizations, so far we tested the cluster-mode and the result is very stable and much faster than before. Also, the redis load is dropped significantly as well.
Let me know if you are comfortable with the gevent change, it's good to have but might not be too important to other users also it might not work properly on some platforms.