@@ -5,7 +5,7 @@ import type {
5
5
MessageQueueListenOptions ,
6
6
} from "@fedify/fedify" ;
7
7
import { getLogger } from "@logtape/logtape" ;
8
- import type { Redis , RedisKey } from "ioredis" ;
8
+ import type { Cluster , Redis , RedisKey } from "ioredis" ;
9
9
import { type Codec , JsonCodec } from "./codec.ts" ;
10
10
11
11
const logger = getLogger ( [ "fedify" , "redis" , "mq" ] ) ;
@@ -63,17 +63,28 @@ export interface RedisMessageQueueOptions {
63
63
* ```ts ignore
64
64
* import { createFederation } from "@fedify/fedify";
65
65
* import { RedisMessageQueue } from "@fedify/redis";
66
- * import { Redis } from "ioredis";
66
+ * import { Redis, Cluster } from "ioredis";
67
67
*
68
+ * // Using a standalone Redis instance:
68
69
* const federation = createFederation({
69
70
* // ...
70
71
* queue: new RedisMessageQueue(() => new Redis()),
71
72
* });
73
+ *
74
+ * // Using a Redis Cluster:
75
+ * const federation = createFederation({
76
+ * // ...
77
+ * queue: new RedisMessageQueue(() => new Cluster([
78
+ * { host: "127.0.0.1", port: 7000 },
79
+ * { host: "127.0.0.1", port: 7001 },
80
+ * { host: "127.0.0.1", port: 7002 },
81
+ * ])),
82
+ * });
72
83
* ```
73
84
*/
74
85
export class RedisMessageQueue implements MessageQueue , Disposable {
75
- #redis: Redis ;
76
- #subRedis: Redis ;
86
+ #redis: Redis | Cluster ;
87
+ #subRedis: Redis | Cluster ;
77
88
#workerId: string ;
78
89
#channelKey: RedisKey ;
79
90
#queueKey: RedisKey ;
@@ -87,7 +98,10 @@ export class RedisMessageQueue implements MessageQueue, Disposable {
87
98
* @param redis The Redis client factory.
88
99
* @param options The options for the message queue.
89
100
*/
90
- constructor ( redis : ( ) => Redis , options : RedisMessageQueueOptions = { } ) {
101
+ constructor (
102
+ redis : ( ) => Redis | Cluster ,
103
+ options : RedisMessageQueueOptions = { } ,
104
+ ) {
91
105
this . #redis = redis ( ) ;
92
106
this . #subRedis = redis ( ) ;
93
107
this . #workerId = options . workerId ?? crypto . randomUUID ( ) ;
@@ -196,9 +210,24 @@ export class RedisMessageQueue implements MessageQueue, Disposable {
196
210
}
197
211
} ;
198
212
const promise = this . #subRedis. subscribe ( this . #channelKey, ( ) => {
199
- this . #subRedis. on ( "message" , poll ) ;
213
+ /**
214
+ * Cast to Redis for event methods. Both Redis and Cluster extend EventEmitter
215
+ * and get the same methods via applyMixin at runtime, but their TypeScript
216
+ * interfaces are incompatible:
217
+ * - Redis declares specific overloads: on(event: "message", cb: (channel, message) => void)
218
+ * - Cluster only has generic: on(event: string | symbol, listener: Function)
219
+ *
220
+ * This makes the union type Redis | Cluster incompatible for these method calls.
221
+ * The cast is safe because both classes use applyMixin(Class, EventEmitter) which
222
+ * copies all EventEmitter prototype methods, giving them identical pub/sub functionality.
223
+ *
224
+ * @see https://github.com/redis/ioredis/blob/main/lib/Redis.ts#L863 (has specific overloads)
225
+ * @see https://github.com/redis/ioredis/blob/main/lib/cluster/index.ts#L1110 (empty interface)
226
+ */
227
+ const subRedis = this . #subRedis as Redis ;
228
+ subRedis . on ( "message" , poll ) ;
200
229
signal ?. addEventListener ( "abort" , ( ) => {
201
- this . # subRedis. off ( "message" , poll ) ;
230
+ subRedis . off ( "message" , poll ) ;
202
231
} ) ;
203
232
} ) ;
204
233
signal ?. addEventListener (
0 commit comments