Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Commit

Permalink
pipeline optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
kefu-reddit committed Oct 9, 2020
1 parent 73f27ed commit 5acdb82
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 65 deletions.
19 changes: 6 additions & 13 deletions rediscluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class RedisCluster(Redis):

# Not complete, but covers the major ones
# https://redis.io/commands
READ_COMMANDS = [
READ_COMMANDS = set([
"BITCOUNT",
"BITPOS",
"EXISTS",
Expand Down Expand Up @@ -212,7 +212,7 @@ class RedisCluster(Redis):
"ZCOUNT",
"ZRANGE",
"ZSCORE"
]
])

RESULT_CALLBACKS = dict_merge(
string_keys_to_dict([
Expand Down Expand Up @@ -442,7 +442,7 @@ def pubsub(self, **kwargs):
"""
return ClusterPubSub(self.connection_pool, **kwargs)

def pipeline(self, transaction=None, shard_hint=None):
def pipeline(self, transaction=None, shard_hint=None, read_from_replicas=False):
"""
Cluster impl:
Pipelines do not work in cluster mode the same way they do in normal mode.
Expand All @@ -461,6 +461,7 @@ def pipeline(self, transaction=None, shard_hint=None):
result_callbacks=self.result_callbacks,
response_callbacks=self.response_callbacks,
cluster_down_retry_attempts=self.cluster_down_retry_attempts,
read_from_replicas=read_from_replicas,
)

def transaction(self, *args, **kwargs):
Expand Down Expand Up @@ -578,7 +579,6 @@ def _execute_command(self, *args, **kwargs):

redirect_addr = None
asking = False
is_read_replica = False

try_random_node = False
slot = self._determine_slot(*args)
Expand All @@ -605,7 +605,6 @@ def _execute_command(self, *args, **kwargs):
slot,
self.read_from_replicas and (command in self.READ_COMMANDS)
)
is_read_replica = node['server_type'] == 'slave'

connection = self.connection_pool.get_connection_by_node(node)

Expand All @@ -615,12 +614,6 @@ def _execute_command(self, *args, **kwargs):
connection.send_command('ASKING')
self.parse_response(connection, "ASKING", **kwargs)
asking = False
if is_read_replica:
# Ask read replica to accept reads (see https://redis.io/commands/readonly)
# TODO: do we need to handle errors from this response?
connection.send_command('READONLY')
self.parse_response(connection, 'READONLY', **kwargs)
is_read_replica = False

connection.send_command(*args)
return self.parse_response(connection, command, **kwargs)
Expand Down Expand Up @@ -687,8 +680,8 @@ def _execute_command(self, *args, **kwargs):
self.refresh_table_asap = True
self.connection_pool.nodes.increment_reinitialize_counter()

node = self.connection_pool.nodes.set_node(e.host, e.port, server_type='master')
self.connection_pool.nodes.slots[e.slot_id][0] = node
node = self.connection_pool.nodes.get_node(e.host, e.port, server_type='master')
self.connection_pool.nodes.move_slot_to_node(e.slot_id, node)
except TryAgainError as e:
log.exception("TryAgainError")

Expand Down
5 changes: 5 additions & 0 deletions rediscluster/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ def get_connection_by_node(self, node):
connection = self._available_connections.get(node["name"], []).pop()
except IndexError:
connection = self.make_connection(node)
server_type = node.get("server_type", "master")
if server_type == "slave":
connection.send_command('READONLY')
if nativestr(connection.read_response()) != 'OK':
raise ConnectionError('READONLY command failed')

self._in_use_connections.setdefault(node["name"], set()).add(connection)

Expand Down
23 changes: 23 additions & 0 deletions rediscluster/nodemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_covera
self.connection_kwargs = connection_kwargs
self.nodes = {}
self.slots = {}
self.slave_nodes_by_master = {}
self.startup_nodes = [] if startup_nodes is None else startup_nodes
self.orig_startup_nodes = [node for node in self.startup_nodes]
self.reinitialize_counter = 0
Expand Down Expand Up @@ -257,6 +258,8 @@ def initialize(self):
node, node_name = self.make_node_obj(master_node[0], master_node[1], 'master')
nodes_cache[node_name] = node

self.slave_nodes_by_master[node_name] = set()

for i in range(int(slot[0]), int(slot[1]) + 1):
if i not in tmp_slots:
tmp_slots[i] = [node]
Expand All @@ -267,6 +270,7 @@ def initialize(self):
target_slave_node, slave_node_name = self.make_node_obj(slave_node[0], slave_node[1], 'slave')
nodes_cache[slave_node_name] = target_slave_node
tmp_slots[i].append(target_slave_node)
self.slave_nodes_by_master[node_name].add(slave_node_name)
else:
# Validate that 2 nodes want to use the same slot cache setup
if tmp_slots[i][0]['name'] != node['name']:
Expand Down Expand Up @@ -409,6 +413,25 @@ def set_node(self, host, port, server_type=None):

return node

def get_node(self, host, port, server_type=None):
node, node_name = self.make_node_obj(host, port, server_type)
if node_name not in self.nodes:
self.nodes[node_name] = node
return self.nodes[node_name]

def move_slot_to_node(self, slot, node):
"""
When moved response received, we should move all replicas with the master to the new slot.
"""
node_name = node['name']
self.slots[slot] = [node]
slave_nodes = self.slave_nodes_by_master.get(node_name)
if slave_nodes:
for slave_name in slave_nodes:
slave_node = self.nodes.get(slave_name)
if slave_node:
self.slots[slot].append(slave_node)

def populate_startup_nodes(self):
"""
Do something with all startup nodes and filters out any duplicates
Expand Down
167 changes: 117 additions & 50 deletions rediscluster/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# python std lib
import sys
import logging

# rediscluster imports
from .client import RedisCluster
Expand All @@ -15,6 +16,10 @@
from redis.exceptions import ConnectionError, RedisError, TimeoutError
from redis._compat import imap, unicode

from gevent import monkey; monkey.patch_all()
import gevent

log = logging.getLogger(__name__)

ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, MovedError, AskError, TryAgainError)

Expand Down Expand Up @@ -174,71 +179,127 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
# If it fails the configured number of times then raise exception back to caller of this method
raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster")

def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True):
"""
Send a bunch of cluster commands to the redis cluster.
def _execute_node_commands(self, n):
n.write()

`allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses
automatically. If set to false it will raise RedisClusterException.
"""
# the first time sending the commands we send all of the commands that were queued up.
# if we have to run through it again, we only retry the commands that failed.
attempt = sorted(stack, key=lambda x: x.position)
n.read()

# build a list of node objects based on node names we need to
def _get_commands_by_node(self, cmds):
nodes = {}
proxy_node_by_master = {}
connection_by_node = {}

# as we move through each command that still needs to be processed,
# we figure out the slot number that command maps to, then from the slot determine the node.
for c in attempt:
for c in cmds:
# refer to our internal node -> slot table that tells us where a given
# command should route to.
slot = self._determine_slot(*c.args)
node = self.connection_pool.get_node_by_slot(slot)

# little hack to make sure the node name is populated. probably could clean this up.
self.connection_pool.nodes.set_node_name(node)
master_node = self.connection_pool.get_node_by_slot(slot)

# for the same master_node, it should always get the same proxy_node to group
# as many commands as possible per node
if master_node['name'] in proxy_node_by_master:
node = proxy_node_by_master[master_node['name']]
else:
# TODO: should determine if using replicas by if command is read only
node = self.connection_pool.get_node_by_slot(slot, self.read_from_replicas)
proxy_node_by_master[master_node['name']] = node

# little hack to make sure the node name is populated. probably could clean this up.
self.connection_pool.nodes.set_node_name(node)

# now that we know the name of the node ( it's just a string in the form of host:port )
# we can build a list of commands for each node.
node_name = node['name']
if node_name not in nodes:
nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node))
if node_name in connection_by_node:
connection = connection_by_node[node_name]
else:
connection = self.connection_pool.get_connection_by_node(node)
connection_by_node[node_name] = connection
nodes[node_name] = NodeCommands(self.parse_response, connection)

nodes[node_name].append(c)

# send the commands in sequence.
# we write to all the open sockets for each node first, before reading anything
# this allows us to flush all the requests out across the network essentially in parallel
# so that we can read them all in parallel as they come back.
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference.
node_commands = nodes.values()
for n in node_commands:
n.write()

for n in node_commands:
n.read()

# release all of the redis connections we allocated earlier back into the connection pool.
# we used to do this step as part of a try/finally block, but it is really dangerous to
# release connections back into the pool if for some reason the socket has data still left in it
# from a previous operation. The write and read operations already have try/catch around them for
# all known types of errors including connection and socket level errors.
# So if we hit an exception, something really bad happened and putting any of
# these connections back into the pool is a very bad idea.
# the socket might have unread buffer still sitting in it, and then the
# next time we read from it we pass the buffered result back from a previous
# command and every single request after to that connection will always get
# a mismatched result. (not just theoretical, I saw this happen on production x.x).
for n in nodes.values():
self.connection_pool.release(n.connection)
return nodes, connection_by_node

def _execute_single_command(self, cmd):
try:
# send each command individually like we do in the main client.
cmd.result = super(ClusterPipeline, self).execute_command(*cmd.args, **cmd.options)
except RedisError as e:
cmd.result = e

def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True):
"""
Send a bunch of cluster commands to the redis cluster.
`allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses
automatically. If set to false it will raise RedisClusterException.
"""
# the first time sending the commands we send all of the commands that were queued up.
# if we have to run through it again, we only retry the commands that failed.
cmds = sorted(stack, key=lambda x: x.position)

max_redirects = 5
cur_attempt = 0

while cur_attempt < max_redirects:

# build a list of node objects based on node names we need to
nodes, connection_by_node = self._get_commands_by_node(cmds)

# send the commands in sequence.
# we write to all the open sockets for each node first, before reading anything
# this allows us to flush all the requests out across the network essentially in parallel
# so that we can read them all in parallel as they come back.
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference.

# duke-cliff: I think it would still be faster if we use gevent to make the command in parallel
# the io is non-blocking, but serialization/deserialization will still be blocking previously
node_commands = nodes.values()
events = []
for n in node_commands:
events.append(gevent.spawn(self._execute_node_commands, n))

gevent.joinall(events)

# release all of the redis connections we allocated earlier back into the connection pool.
# we used to do this step as part of a try/finally block, but it is really dangerous to
# release connections back into the pool if for some reason the socket has data still left in it
# from a previous operation. The write and read operations already have try/catch around them for
# all known types of errors including connection and socket level errors.
# So if we hit an exception, something really bad happened and putting any of
# these connections back into the pool is a very bad idea.
# the socket might have unread buffer still sitting in it, and then the
# next time we read from it we pass the buffered result back from a previous
# command and every single request after to that connection will always get
# a mismatched result. (not just theoretical, I saw this happen on production x.x).
for conn in connection_by_node.values():
self.connection_pool.release(conn)

# will regroup moved commands and retry using pipeline(stacked commands)
# this would increase the pipeline performance a lot
moved_cmds = []
for c in cmds:
if isinstance(c.result, MovedError):
e = c.result
node = self.connection_pool.nodes.get_node(e.host, e.port, server_type='master')
self.connection_pool.nodes.move_slot_to_node(e.slot_id, node)

moved_cmds.append(c)

if moved_cmds:
cur_attempt += 1
cmds = sorted(moved_cmds, key=lambda x: x.position)
continue

break

# if the response isn't an exception it is a valid response from the node
# we're all done with that command, YAY!
# if we have more commands to attempt, we've run into problems.
# collect all the commands we are allowed to retry.
# (MOVED, ASK, or connection errors or timeout errors)
attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position)
attempt = sorted([c for c in stack if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position)
if attempt and allow_redirections:
# RETRY MAGIC HAPPENS HERE!
# send these remaing comamnds one at a time using `execute_command`
Expand All @@ -255,13 +316,19 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=
# If a lot of commands have failed, we'll be setting the
# flag to rebuild the slots table from scratch. So MOVED errors should
# correct themselves fairly quickly.

# with the previous redirect retries, I could barely see the slow mode happening again
log.info("pipeline in slow mode to execute failed commands: {}".format([c.result for c in attempt]))

self.connection_pool.nodes.increment_reinitialize_counter(len(attempt))

# even in the slow mode, we could use gevent to make things faster
events = []
for c in attempt:
try:
# send each command individually like we do in the main client.
c.result = super(ClusterPipeline, self).execute_command(*c.args, **c.options)
except RedisError as e:
c.result = e
events.append(gevent.spawn(self._execute_single_command, c))

gevent.joinall(events)


# turn the response back into a simple flat array that corresponds
# to the sequence of commands issued in the stack in pipeline.execute()
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
redis>=3.0.0,<4.0.0
gevent
greenlet
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

setup(
name="redis-py-cluster",
version="2.1.0",
version="2.1.1",
description="Library for communicating with Redis Clusters. Built on top of redis-py lib",
long_description=readme + '\n\n' + history,
long_description_content_type="text/markdown",
Expand All @@ -32,7 +32,9 @@
url='http://github.com/grokzen/redis-py-cluster',
license='MIT',
install_requires=[
'redis>=3.0.0,<4.0.0'
'redis>=3.0.0,<4.0.0',
'gevent',
'greenlet',
],
python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4",
extras_require={
Expand Down

0 comments on commit 5acdb82

Please sign in to comment.