Skip to content

Commit 7ffa7cc

Browse files
committed
Introduce SharedSubscribe for state management.
1 parent 82dbcf5 commit 7ffa7cc

File tree

2 files changed

+135
-21
lines changed

2 files changed

+135
-21
lines changed

lib/async/redis/cluster_client.rb

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# Copyright, 2025, by Travis Bell.
66

77
require_relative "client"
8+
require_relative "context/shard_subscribe"
89
require "io/stream"
910

1011
module Async
@@ -138,8 +139,7 @@ def client_for(slot, role = :master)
138139
end
139140

140141
# Get any available client from the cluster.
141-
# This is useful for operations that don't require slot-specific routing,
142-
# such as global pub/sub operations, INFO commands, or other cluster-wide operations.
142+
# This is useful for operations that don't require slot-specific routing, such as global pub/sub operations, INFO commands, or other cluster-wide operations.
143143
# @parameter role [Symbol] The role of node to get (:master or :slave).
144144
# @returns [Client] A Redis client for any available node.
145145
def any_client(role = :master)
@@ -160,15 +160,6 @@ def any_client(role = :master)
160160
client_for(0, role)
161161
end
162162

163-
# Execute a Redis command on any available cluster node.
164-
# This is useful for commands that don't require slot-specific routing.
165-
# @parameter command [String] The Redis command to execute.
166-
# @parameter arguments [Array] The command arguments.
167-
# @returns [Object] The result of the Redis command.
168-
def call(command, *arguments)
169-
any_client.call(command, *arguments)
170-
end
171-
172163
protected
173164

174165
def reload_cluster!(endpoints = @endpoints)
@@ -339,23 +330,24 @@ def psubscribe(*patterns)
339330
end
340331

341332
# Subscribe to one or more sharded channels for pub/sub messaging in cluster environment (Redis 7.0+).
342-
# The subscription will be created on the node responsible for the channel's hash slot.
333+
# The subscription will be created on the appropriate nodes responsible for each channel's hash slot.
343334
# @parameter channels [Array(String)] The sharded channels to subscribe to.
344335
# @yields {|context| ...} If a block is given, it will be executed within the subscription context.
345-
# @parameter context [Context::Subscribe] The subscription context.
336+
# @parameter context [Context::ShardSubscribe] The shard subscription context.
346337
# @returns [Object] The result of the block if block given.
347-
# @returns [Context::Subscribe] The subscription context if no block given.
338+
# @returns [Context::ShardSubscribe] The shard subscription context if no block given.
348339
def ssubscribe(*channels)
349-
# For sharded subscriptions, route to appropriate node based on channel hash
350-
slot = channels.any? ? slot_for(channels.first) : 0
351-
client = client_for(slot)
340+
context = Context::ShardSubscribe.new(self)
341+
context.subscribe(channels) if channels.any?
352342

353-
client.ssubscribe(*channels) do |context|
354-
if block_given?
343+
if block_given?
344+
begin
355345
yield context
356-
else
357-
return context
346+
ensure
347+
context.close
358348
end
349+
else
350+
return context
359351
end
360352
end
361353
end
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require_relative "subscribe"
7+
8+
module Async
9+
module Redis
10+
module Context
11+
# Context for managing sharded subscriptions across multiple Redis cluster nodes.
12+
# This class handles the complexity of subscribing to channels that may be distributed
13+
# across different shards in a Redis cluster.
14+
class ShardSubscribe
15+
# Initialize a new shard subscription context.
16+
# @parameter cluster_client [ClusterClient] The cluster client to use.
17+
def initialize(cluster_client)
18+
@cluster_client = cluster_client
19+
@subscriptions = {}
20+
@channels = []
21+
end
22+
23+
# Close all shard subscriptions.
24+
def close
25+
@subscriptions.each_value(&:close)
26+
@subscriptions.clear
27+
end
28+
29+
# Listen for the next message from any subscribed shard.
30+
# This uses a simple round-robin approach to check each shard.
31+
# @returns [Array] The next message response, or nil if all connections closed.
32+
def listen
33+
return nil if @subscriptions.empty?
34+
35+
# Simple round-robin checking of subscriptions
36+
@subscriptions.each_value do |subscription|
37+
# Non-blocking check for messages
38+
begin
39+
if response = subscription.listen
40+
return response
41+
end
42+
rescue => error
43+
# Handle connection errors gracefully
44+
Console.warn(self, "Error reading from shard subscription: #{error}")
45+
end
46+
end
47+
48+
# If no immediate messages, do a blocking wait on the first subscription
49+
if first_subscription = @subscriptions.values.first
50+
first_subscription.listen
51+
end
52+
end
53+
54+
# Iterate over all messages from all subscribed shards.
55+
# @yields {|response| ...} Block called for each message.
56+
# @parameter response [Array] The message response.
57+
def each
58+
return to_enum unless block_given?
59+
60+
while response = self.listen
61+
yield response
62+
end
63+
end
64+
65+
# Subscribe to additional sharded channels.
66+
# @parameter channels [Array(String)] The channels to subscribe to.
67+
def subscribe(channels)
68+
slots = @cluster_client.slots_for(channels)
69+
70+
slots.each do |slot, channels_for_slot|
71+
if subscription = @subscriptions[slot]
72+
# Add to existing subscription for this shard
73+
subscription.ssubscribe(channels_for_slot)
74+
else
75+
# Create new subscription for this shard
76+
client = @cluster_client.client_for(slot)
77+
@subscriptions[slot] = client.ssubscribe(*channels_for_slot)
78+
end
79+
end
80+
81+
@channels.concat(channels)
82+
end
83+
84+
# Unsubscribe from sharded channels.
85+
# @parameter channels [Array(String)] The channels to unsubscribe from.
86+
def unsubscribe(channels)
87+
slots = @cluster_client.slots_for(channels)
88+
89+
slots.each do |slot, channels_for_slot|
90+
if subscription = @subscriptions[slot]
91+
subscription.sunsubscribe(channels_for_slot)
92+
93+
# Remove channels from our tracking
94+
@channels -= channels_for_slot
95+
96+
# Check if this shard still has channels
97+
remaining_channels_for_slot = @channels.select { |ch| @cluster_client.slot_for(ch) == slot }
98+
99+
# If no channels left for this shard, close and remove it
100+
if remaining_channels_for_slot.empty?
101+
subscription.close
102+
@subscriptions.delete(slot)
103+
end
104+
end
105+
end
106+
end
107+
108+
# Get the list of currently subscribed channels.
109+
# @returns [Array(String)] The list of subscribed channels.
110+
def channels
111+
@channels.dup
112+
end
113+
114+
# Get the number of active shard subscriptions.
115+
# @returns [Integer] The number of shard connections.
116+
def shard_count
117+
@subscriptions.size
118+
end
119+
end
120+
end
121+
end
122+
end

0 commit comments

Comments
 (0)