Skip to content

Commit 1491595

Browse files
committed
Properly use MGET and MSET against a cluster by detecting uses of hash slots (which determines that all keys go to the same cluster node)
1 parent cec3cda commit 1491595

File tree

1 file changed

+42
-13
lines changed

1 file changed

+42
-13
lines changed

aredis/commands/strings.py

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
dict_merge,
77
bool_ok,
88
string_keys_to_dict)
9+
from collections import OrderedDict, defaultdict
910

1011

1112
class BitField(object):
@@ -329,34 +330,62 @@ async def mget(self, keys, *args):
329330
Returns a list of values ordered identically to ``keys``
330331
331332
Cluster impl:
332-
Itterate all keys and send GET for each key.
333-
This will go alot slower than a normal mget call in StrictRedis.
334-
335-
Operation is no longer atomic.
336-
"""
337-
res = list()
338-
for arg in list_or_args(keys, args):
339-
res.append(await self.get(arg))
340-
return res
333+
Find groups of keys with the same hash tag
334+
https://redis.io/topics/cluster-tutorial#redis-cluster-data-sharding
335+
and execute an MGET for each group
336+
execute individual GETs for all other keys passed in
337+
338+
Operation will be atomic only if keys belonging to a single
339+
hash tag are passed in.
340+
"""
341+
res_mapping = OrderedDict()
342+
hash_tag_slots = defaultdict(list)
343+
for key in list_or_args(keys, args):
344+
if key.count('{') == key.count('}') == 1 and \
345+
-1 < key.index('{') < key.index('}') - 1:
346+
hash_tag = key.split('{')[1].split('}')[0]
347+
hash_tag_slots[hash_tag].append(key)
348+
res_mapping[key] = None # establish correct ordering
349+
else:
350+
# a loose key without a hash tag, can't use MGET
351+
res_mapping[key] = await self.get(key)
352+
for mget_keys in hash_tag_slots.values():
353+
mget_res = await self.execute_command('MGET', *mget_keys)
354+
for key, res in zip(mget_keys, mget_res):
355+
res_mapping[key] = res
356+
return list(res_mapping.values())
341357

342358
async def mset(self, *args, **kwargs):
343359
"""
344360
Sets key/values based on a mapping. Mapping can be supplied as a single
345361
dictionary argument or as kwargs.
346362
347363
Cluster impl:
348-
Itterate over all items and do SET on each (k,v) pair
364+
Find groups of keys with the same hash tag
365+
https://redis.io/topics/cluster-tutorial#redis-cluster-data-sharding
366+
and execute an MSET for each group
367+
execute individual SETs for all other key/value pairs
349368
350-
Operation is no longer atomic.
369+
Operation will be atomic only if keys belonging to a single
370+
hash tag are passed in.
351371
"""
352372
if args:
353373
if len(args) != 1 or not isinstance(args[0], dict):
354374
raise RedisError('MSET requires **kwargs or a single dict arg')
355375
kwargs.update(args[0])
356376

377+
hash_tag_slots = defaultdict(list)
357378
for pair in iteritems(kwargs):
358-
await self.set(pair[0], pair[1])
359-
379+
key, v = pair
380+
if key.count('{') == key.count('}') == 1 and \
381+
-1 < key.index('{') < key.index('}') - 1:
382+
hash_tag = key.split('{')[1].split('}')[0]
383+
hash_tag_slots[hash_tag].extend(pair)
384+
else:
385+
# a loose key without a hash tag, can't use MSET
386+
await self.set(key, v)
387+
for mset_items in hash_tag_slots.values():
388+
await self.execute_command('MSET', *mset_items)
360389
return True
361390

362391
async def msetnx(self, *args, **kwargs):

0 commit comments

Comments
 (0)