Skip to content

Commit 82dbcf5

Browse files
committed
Fix tests again.
1 parent 4e21c26 commit 82dbcf5

File tree

2 files changed

+91
-84
lines changed

2 files changed

+91
-84
lines changed

cluster/test/async/redis/context/subscribe.rb

Lines changed: 31 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# Released under the MIT License.
44
# Copyright, 2025, by Samuel Williams.
55

6+
require "async/variable"
67
require "async/redis/cluster_client"
78
require "sus/fixtures/async"
89
require "securerandom"
@@ -29,58 +30,34 @@
2930

3031
it "can subscribe to sharded channels and receive messages" do
3132
received_message = nil
32-
condition = Async::Condition.new
33-
spublish_available = false
33+
ready = Async::Variable.new
3434

3535
# Set up the subscriber using cluster client's ssubscribe method
36-
subscriber_task = reactor.async do
37-
begin
38-
cluster.ssubscribe(shard_channel) do |context|
39-
condition.signal # Signal that we're ready
40-
spublish_available = true
41-
42-
type, name, message = context.listen
43-
44-
expect(type).to be == "smessage"
45-
expect(name).to be == shard_channel
46-
received_message = message
47-
end
48-
rescue Protocol::Redis::ServerError => error
49-
if error.message.include?("unknown command")
50-
Console.warn("SSUBSCRIBE not available on this Redis version")
51-
condition.signal
52-
else
53-
raise
54-
end
36+
subscriber_task = Async do
37+
cluster.ssubscribe(shard_channel) do |context|
38+
ready.resolve
39+
40+
type, name, message = context.listen
41+
42+
expect(type).to be == "smessage"
43+
expect(name).to be == shard_channel
44+
received_message = message
5545
end
5646
end
5747

5848
# Set up the publisher
59-
publisher_task = reactor.async do
60-
condition.wait # Wait for subscriber to be ready
49+
publisher_task = Async do
50+
ready.wait
6151

62-
if spublish_available
63-
begin
64-
# Get a client on the same node as the subscriber for SPUBLISH
65-
slot = cluster.slot_for(shard_channel)
66-
publisher_client = cluster.client_for(slot)
67-
publisher_client.call("SPUBLISH", shard_channel, shard_message)
68-
rescue => error
69-
Console.warn("SPUBLISH failed: #{error}")
70-
end
71-
end
52+
slot = cluster.slot_for(shard_channel)
53+
publisher_client = cluster.client_for(slot)
54+
publisher_client.call("SPUBLISH", shard_channel, shard_message)
7255
end
7356

7457
publisher_task.wait
75-
sleep(0.1) # Allow message delivery
76-
subscriber_task.stop
58+
subscriber_task.wait
7759

78-
# Only check message if sharded pub/sub was available
79-
if spublish_available && received_message
80-
expect(received_message).to be == shard_message
81-
else
82-
Console.warn("Skipping assertion - sharded pub/sub not available on this Redis version")
83-
end
60+
expect(received_message).to be == shard_message
8461
end
8562

8663
it "distributes sharded messages across cluster nodes" do
@@ -94,26 +71,26 @@
9471
]
9572

9673
# Find channels that map to different slots/nodes
97-
channel_slots = channels.map {|ch| [ch, cluster.slot_for(ch)]}
74+
channel_slots = channels.map {|channel| [channel, cluster.slot_for(channel)]}
9875
unique_slots = channel_slots.map(&:last).uniq
9976

10077
# We should have channels distributed across different slots
10178
expect(unique_slots.size).to be > 1
10279

10380
received_messages = []
104-
condition = Async::Condition.new
81+
ready = Async::Variable.new
10582
subscriber_count = 0
10683
target_count = channels.size
10784

10885
# Set up subscribers for each channel
10986
subscriber_tasks = channels.map do |channel|
110-
reactor.async do
87+
Async do
11188
slot = cluster.slot_for(channel)
11289
client = cluster.client_for(slot)
11390

11491
client.ssubscribe(channel) do |context|
11592
subscriber_count += 1
116-
condition.signal if subscriber_count == target_count
93+
ready.resolve if subscriber_count == target_count
11794

11895
type, name, message = context.listen
11996
received_messages << {channel: name, message: message, slot: slot}
@@ -122,27 +99,19 @@
12299
end
123100

124101
# Set up publisher
125-
publisher_task = reactor.async do
126-
condition.wait # Wait for all subscribers
102+
publisher_task = Async do
103+
ready.wait # Wait for all subscribers
127104

128105
channels.each_with_index do |channel, index|
129106
slot = cluster.slot_for(channel)
130107
client = cluster.client_for(slot)
131108

132-
begin
133-
client.call("SPUBLISH", channel, "message-#{index}")
134-
rescue => error
135-
Console.warn("SPUBLISH failed for #{channel}: #{error}")
136-
# Clean up and skip if SPUBLISH not available
137-
subscriber_tasks.each(&:stop)
138-
return
139-
end
109+
client.call("SPUBLISH", channel, "message-#{index}")
140110
end
141111
end
142112

143113
publisher_task.wait
144-
sleep(0.1) # Allow time for message delivery
145-
subscriber_tasks.each(&:stop)
114+
subscriber_tasks.each(&:wait)
146115

147116
# Verify we received messages for different channels
148117
expect(received_messages.size).to be == channels.size
@@ -197,20 +166,12 @@
197166

198167
# Publish to sharded channel
199168
shard_client = cluster.client_for(shard_slot)
200-
begin
201-
shard_client.call("SPUBLISH", shard_channel, "sharded message")
202-
rescue => error
203-
Console.warn("SPUBLISH not available: #{error}")
204-
regular_task.stop
205-
shard_task.stop
206-
return
207-
end
169+
shard_client.call("SPUBLISH", shard_channel, "sharded message")
208170
end
209171

210172
publisher_task.wait
211-
sleep(0.1) # Allow time for message delivery
212-
regular_task.stop
213-
shard_task.stop
173+
regular_task.wait
174+
shard_task.wait
214175

215176
# Should have received both messages
216177
expect(received_messages.size).to be == 2
@@ -276,17 +237,11 @@
276237
publisher_client.publish(channel, "unified regular")
277238

278239
# Publish sharded message
279-
begin
280-
publisher_client.call("SPUBLISH", shard_channel, "unified sharded")
281-
rescue => error
282-
Console.warn("SPUBLISH not available: #{error}")
283-
# Skip sharded part if not available
284-
end
240+
publisher_client.call("SPUBLISH", shard_channel, "unified sharded")
285241
end
286242

287243
publisher_task.wait
288-
sleep(0.1) # Allow message delivery
289-
subscriber_task.stop
244+
subscriber_task.wait
290245

291246
# Should receive both message types on same context
292247
expect(received_messages.size).to be == 2

lib/async/redis/cluster_client.rb

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ def each
6363
end
6464
end
6565

66+
# Get a random value from the map.
67+
# @returns [Object] A randomly selected value, or nil if map is empty.
68+
def sample
69+
return nil if @ranges.empty?
70+
71+
range, value = @ranges.sample
72+
73+
return value
74+
end
75+
6676
# Clear all ranges from the map.
6777
def clear
6878
@ranges.clear
@@ -127,6 +137,38 @@ def client_for(slot, role = :master)
127137
end
128138
end
129139

140+
# Get any available client from the cluster.
141+
# This is useful for operations that don't require slot-specific routing,
142+
# such as global pub/sub operations, INFO commands, or other cluster-wide operations.
143+
# @parameter role [Symbol] The role of node to get (:master or :slave).
144+
# @returns [Client] A Redis client for any available node.
145+
def any_client(role = :master)
146+
unless @shards
147+
reload_cluster!
148+
end
149+
150+
# Sample a random shard to get better load distribution
151+
if nodes = @shards.sample
152+
nodes = nodes.select{|node| node.role == role}
153+
154+
if node = nodes.sample
155+
return (node.client ||= Client.new(node.endpoint, **@options))
156+
end
157+
end
158+
159+
# Fallback to slot 0 if sampling fails
160+
client_for(0, role)
161+
end
162+
163+
# Execute a Redis command on any available cluster node.
164+
# This is useful for commands that don't require slot-specific routing.
165+
# @parameter command [String] The Redis command to execute.
166+
# @parameter arguments [Array] The command arguments.
167+
# @returns [Object] The result of the Redis command.
168+
def call(command, *arguments)
169+
any_client.call(command, *arguments)
170+
end
171+
130172
protected
131173

132174
def reload_cluster!(endpoints = @endpoints)
@@ -245,16 +287,22 @@ def slots_for(keys)
245287
end
246288

247289
# Subscribe to one or more channels for pub/sub messaging in cluster environment.
248-
# The subscription will be created on the node responsible for the first channel.
290+
#
291+
# NOTE: Regular pub/sub in Redis Cluster is GLOBAL - messages propagate to all nodes.
292+
# This method is a convenience that subscribes via an arbitrary cluster node.
293+
# The choice of node does not affect which messages you receive, since regular
294+
# pub/sub messages are broadcast to all nodes in the cluster.
295+
#
296+
# For slot-aware pub/sub, use ssubscribe() instead (Redis 7.0+).
297+
#
249298
# @parameter channels [Array(String)] The channels to subscribe to.
250299
# @yields {|context| ...} If a block is given, it will be executed within the subscription context.
251300
# @parameter context [Context::Subscribe] The subscription context.
252301
# @returns [Object] The result of the block if block given.
253302
# @returns [Context::Subscribe] The subscription context if no block given.
254303
def subscribe(*channels)
255-
# For regular pub/sub, route to node based on first channel
256-
slot = channels.any? ? slot_for(channels.first) : 0
257-
client = client_for(slot)
304+
# For regular pub/sub, use any available node since messages are global
305+
client = any_client
258306

259307
client.subscribe(*channels) do |context|
260308
if block_given?
@@ -266,16 +314,20 @@ def subscribe(*channels)
266314
end
267315

268316
# Subscribe to one or more channel patterns for pub/sub messaging in cluster environment.
269-
# The subscription will be created on the node responsible for a deterministic slot.
317+
#
318+
# NOTE: Pattern subscriptions in Redis Cluster are GLOBAL - they match channels
319+
# across all nodes. This method is a convenience that subscribes via an arbitrary
320+
# cluster node. Pattern matching works across the entire cluster regardless of
321+
# which node you subscribe from.
322+
#
270323
# @parameter patterns [Array(String)] The channel patterns to subscribe to.
271324
# @yields {|context| ...} If a block is given, it will be executed within the subscription context.
272325
# @parameter context [Context::Subscribe] The subscription context.
273326
# @returns [Object] The result of the block if block given.
274327
# @returns [Context::Subscribe] The subscription context if no block given.
275328
def psubscribe(*patterns)
276-
# For pattern subscriptions, use a deterministic slot since patterns can match any channel
277-
slot = patterns.any? ? slot_for(patterns.first) : 0
278-
client = client_for(slot)
329+
# For pattern subscriptions, use any available node since patterns are global
330+
client = any_client
279331

280332
client.psubscribe(*patterns) do |context|
281333
if block_given?

0 commit comments

Comments
 (0)