Skip to content

Commit 15cb90d

Browse files
committed
refactor: make MessageBroker a required constructor arg on BaseMessageSender
- BaseMessageSender, RqueueEndpointManagerImpl, RqueueMessageEnqueuerImpl, RqueueMessageManagerImpl, ReactiveRqueueMessageEnqueuerImpl now take MessageBroker as a constructor arg instead of reading it off the template. - enqueue/storeMessageMetadata/notifyBrokerQueueRegistered route through the injected broker unconditionally; the Redis-vs-NATS dispatch lives inside each broker implementation. - RedisMessageBroker overrides enqueueReactive/enqueueWithDelayReactive so reactive callers stay on the reactive Redis driver. - RqueueMessageTemplateImpl no longer holds a MessageBroker; the setMessageBroker wiring in RqueueListenerAutoConfig / RqueueListenerConfig / RqueueMessageListenerContainer is removed in favor of injecting the broker bean directly into the *Impl beans. - Tests updated to construct *Impl beans with an explicit broker; new RqueueMessageEnqueuerBrokerRoutingTest pins the non-reactive routing path. Assisted-By: Claude Code
1 parent 253fcc4 commit 15cb90d

24 files changed

Lines changed: 347 additions & 242 deletions

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ ext {
8585

8686
subprojects {
8787
group = "com.github.sonus21"
88-
version = "4.0.0-SK1"
88+
version = "4.0.0-SK3"
8989

9090
dependencies {
9191
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

rqueue-core/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,5 +223,5 @@
223223
*
224224
* @return queue delivery mode
225225
*/
226-
QueueType queueMode() default QueueType.QUEUE;
226+
QueueType mode() default QueueType.QUEUE;
227227
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.github.sonus21.rqueue.core;
1818

19-
import com.github.sonus21.rqueue.core.spi.MessageBroker;
2019
import com.github.sonus21.rqueue.models.MessageMoveResult;
2120
import java.util.List;
2221
import java.util.Optional;
@@ -102,13 +101,4 @@ Flux<Long> addReactiveMessageWithDelay(
102101
Optional<RqueueMessage> findFirstElementFromZset(String name);
103102

104103
Optional<TypedTuple<RqueueMessage>> findFirstElementFromZsetWithScore(String name);
105-
106-
/**
107-
* Returns the optional pluggable {@link MessageBroker} associated with this template, or
108-
* {@code null} when the template uses the default Redis-backed path. Internal use; subject
109-
* to change.
110-
*/
111-
default MessageBroker getMessageBroker() {
112-
return null;
113-
}
114104
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java

Lines changed: 17 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ abstract class BaseMessageSender {
5353
protected final MessageConverter messageConverter;
5454
protected final RqueueMessageTemplate messageTemplate;
5555
protected final RqueueMessageIdGenerator messageIdGenerator;
56+
protected final com.github.sonus21.rqueue.core.spi.MessageBroker messageBroker;
5657

5758
@Autowired
5859
protected RqueueConfig rqueueConfig;
@@ -62,22 +63,24 @@ abstract class BaseMessageSender {
6263

6364
BaseMessageSender(
6465
RqueueMessageTemplate messageTemplate,
66+
com.github.sonus21.rqueue.core.spi.MessageBroker messageBroker,
6567
MessageConverter messageConverter,
6668
MessageHeaders messageHeaders,
6769
RqueueMessageIdGenerator messageIdGenerator) {
6870
notNull(messageTemplate, "messageTemplate cannot be null");
71+
notNull(messageBroker, "messageBroker cannot be null");
6972
notNull(messageConverter, "messageConverter cannot be null");
7073
notNull(messageIdGenerator, "messageIdGenerator cannot be null");
7174
this.messageTemplate = messageTemplate;
75+
this.messageBroker = messageBroker;
7276
this.messageConverter = messageConverter;
7377
this.messageHeaders = messageHeaders;
7478
this.messageIdGenerator = messageIdGenerator;
7579
}
7680

7781
protected Object storeMessageMetadata(
7882
RqueueMessage rqueueMessage, Long delayInMillis, boolean reactive, boolean isUnique) {
79-
com.github.sonus21.rqueue.core.spi.MessageBroker broker = messageTemplate.getMessageBroker();
80-
boolean skipMetadata = broker != null && !broker.capabilities().usesPrimaryHandlerDispatch();
83+
boolean skipMetadata = !messageBroker.capabilities().usesPrimaryHandlerDispatch();
8184
if (skipMetadata) {
8285
return reactive ? reactor.core.publisher.Mono.just(true) : null;
8386
}
@@ -100,50 +103,30 @@ protected Object enqueue(
100103
}
101104

102105
/**
103-
* Priority-aware enqueue. When a non-Redis
104-
* {@link com.github.sonus21.rqueue.core.spi.MessageBroker} is set on the underlying
105-
* {@link RqueueMessageTemplate} (i.e. capabilities advertise {@code !usesPrimaryHandlerDispatch})
106-
* this routes the publish through
107-
* {@link com.github.sonus21.rqueue.core.spi.MessageBroker#enqueue(QueueDetail, String,
108-
* RqueueMessage)} so backends like NATS can publish to a priority-specific subject. Otherwise the
109-
* existing Redis-shaped path is used; Redis already encodes priority in the queue name so
110-
* {@code priority} is ignored.
106+
* Priority-aware enqueue. Always routes through {@link
107+
* com.github.sonus21.rqueue.core.spi.MessageBroker} — the Redis-vs-NATS dispatch lives inside
108+
* each broker implementation. Backends that key off the queue name (Redis) ignore {@code
109+
* priority}; backends that publish to a per-priority destination (NATS) use it to pick the
110+
* subject. Reactive enqueues route through {@code enqueueReactive} so backends with native
111+
* async APIs do not block a thread.
111112
*/
112113
protected Object enqueue(
113114
QueueDetail queueDetail,
114115
String priority,
115116
RqueueMessage rqueueMessage,
116117
Long delayInMilliSecs,
117118
boolean reactive) {
118-
com.github.sonus21.rqueue.core.spi.MessageBroker broker = messageTemplate.getMessageBroker();
119-
boolean useBroker =
120-
!reactive && broker != null && !broker.capabilities().usesPrimaryHandlerDispatch();
121119
if (delayInMilliSecs == null || delayInMilliSecs <= MIN_DELAY) {
122-
if (useBroker) {
123-
broker.enqueue(queueDetail, priority, rqueueMessage);
124-
return null;
125-
}
126120
if (reactive) {
127-
return messageTemplate.addReactiveMessage(queueDetail.getQueueName(), rqueueMessage);
128-
} else {
129-
messageTemplate.addMessage(queueDetail.getQueueName(), rqueueMessage);
121+
return messageBroker.enqueueReactive(queueDetail, rqueueMessage);
130122
}
123+
messageBroker.enqueue(queueDetail, priority, rqueueMessage);
131124
} else {
132-
if (useBroker) {
133-
broker.enqueueWithDelay(queueDetail, rqueueMessage, delayInMilliSecs);
134-
return null;
135-
}
136125
if (reactive) {
137-
return messageTemplate.addReactiveMessageWithDelay(
138-
queueDetail.getScheduledQueueName(),
139-
queueDetail.getScheduledQueueChannelName(),
140-
rqueueMessage);
141-
} else {
142-
messageTemplate.addMessageWithDelay(
143-
queueDetail.getScheduledQueueName(),
144-
queueDetail.getScheduledQueueChannelName(),
145-
rqueueMessage);
126+
return messageBroker.enqueueWithDelayReactive(
127+
queueDetail, rqueueMessage, delayInMilliSecs);
146128
}
129+
messageBroker.enqueueWithDelay(queueDetail, rqueueMessage, delayInMilliSecs);
147130
}
148131
return null;
149132
}
@@ -261,9 +244,6 @@ protected void registerQueueInternal(String queueName, QueueType type, String...
261244
}
262245

263246
private void notifyBrokerQueueRegistered(QueueDetail queueDetail) {
264-
com.github.sonus21.rqueue.core.spi.MessageBroker broker = messageTemplate.getMessageBroker();
265-
if (broker != null) {
266-
broker.onQueueRegistered(queueDetail);
267-
}
247+
messageBroker.onQueueRegistered(queueDetail);
268248
}
269249
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl.java

Lines changed: 19 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -32,44 +32,32 @@
3232
import lombok.extern.slf4j.Slf4j;
3333
import org.springframework.messaging.MessageHeaders;
3434
import org.springframework.messaging.converter.MessageConverter;
35-
import reactor.core.publisher.Flux;
3635
import reactor.core.publisher.Mono;
3736

3837
@Slf4j
3938
public class ReactiveRqueueMessageEnqueuerImpl extends BaseMessageSender
4039
implements ReactiveRqueueMessageEnqueuer {
4140

42-
// Optional broker delegate. When non-null, reactive enqueue routes through
43-
// MessageBroker.enqueueReactive instead of the reactive Redis template path.
44-
private MessageBroker messageBroker;
45-
4641
public ReactiveRqueueMessageEnqueuerImpl(
4742
RqueueMessageTemplate messageTemplate,
43+
MessageBroker messageBroker,
4844
MessageConverter messageConverter,
4945
MessageHeaders messageHeaders) {
50-
this(messageTemplate, messageConverter, messageHeaders, new UuidV4RqueueMessageIdGenerator());
46+
this(
47+
messageTemplate,
48+
messageBroker,
49+
messageConverter,
50+
messageHeaders,
51+
new UuidV4RqueueMessageIdGenerator());
5152
}
5253

5354
public ReactiveRqueueMessageEnqueuerImpl(
5455
RqueueMessageTemplate messageTemplate,
56+
MessageBroker messageBroker,
5557
MessageConverter messageConverter,
5658
MessageHeaders messageHeaders,
5759
RqueueMessageIdGenerator messageIdGenerator) {
58-
super(messageTemplate, messageConverter, messageHeaders, messageIdGenerator);
59-
}
60-
61-
/**
62-
* Set an optional {@link MessageBroker} delegate. When non-null, reactive enqueue calls route
63-
* through {@link MessageBroker#enqueueReactive(QueueDetail, RqueueMessage)} instead of the legacy
64-
* reactive Redis template path. Existing Redis users that do not configure a broker keep the
65-
* original behavior.
66-
*/
67-
public void setMessageBroker(MessageBroker messageBroker) {
68-
this.messageBroker = messageBroker;
69-
}
70-
71-
public MessageBroker getMessageBroker() {
72-
return messageBroker;
60+
super(messageTemplate, messageBroker, messageConverter, messageHeaders, messageIdGenerator);
7361
}
7462

7563
@SuppressWarnings("unchecked")
@@ -96,32 +84,18 @@ private <T> Mono<T> pushReactiveMessage(
9684
Mono<Boolean> storeResult =
9785
(Mono<Boolean>) storeMessageMetadata(rqueueMessage, delayInMilliSecs, true, isUnique);
9886
return storeResult.flatMap(success -> {
99-
if (Boolean.TRUE.equals(success)) {
100-
if (messageBroker != null) {
101-
Mono<Void> brokerMono;
102-
if (delayInMilliSecs == null
103-
|| delayInMilliSecs <= com.github.sonus21.rqueue.utils.Constants.MIN_DELAY) {
104-
brokerMono = messageBroker.enqueueReactive(queueDetail, rqueueMessage);
105-
} else {
106-
brokerMono = messageBroker.enqueueWithDelayReactive(
107-
queueDetail, rqueueMessage, delayInMilliSecs);
108-
}
109-
return brokerMono.then(Mono.defer(() -> monoConverter.apply(rqueueMessage)));
110-
}
111-
Object result = enqueue(queueDetail, rqueueMessage, delayInMilliSecs, true);
112-
Mono<Long> enqueueMono;
113-
if (result instanceof Flux) {
114-
enqueueMono = ((Flux<Long>) result).next();
115-
} else if (result instanceof Mono) {
116-
enqueueMono = (Mono<Long>) result;
117-
} else {
118-
return Mono.error(
119-
new IllegalStateException("Unexpected enqueue result type: " + result.getClass()));
120-
}
121-
return enqueueMono.flatMap(ignore -> monoConverter.apply(rqueueMessage));
122-
} else {
87+
if (!Boolean.TRUE.equals(success)) {
12388
return Mono.error(new DuplicateMessageException(rqueueMessage.getId()));
12489
}
90+
Mono<Void> brokerMono;
91+
if (delayInMilliSecs == null
92+
|| delayInMilliSecs <= com.github.sonus21.rqueue.utils.Constants.MIN_DELAY) {
93+
brokerMono = messageBroker.enqueueReactive(queueDetail, rqueueMessage);
94+
} else {
95+
brokerMono =
96+
messageBroker.enqueueWithDelayReactive(queueDetail, rqueueMessage, delayInMilliSecs);
97+
}
98+
return brokerMono.then(Mono.defer(() -> monoConverter.apply(rqueueMessage)));
12599
});
126100
} catch (Exception e) {
127101
log.error(

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueEndpointManagerImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,24 @@ public class RqueueEndpointManagerImpl extends BaseMessageSender implements Rque
5151

5252
public RqueueEndpointManagerImpl(
5353
RqueueMessageTemplate messageTemplate,
54+
com.github.sonus21.rqueue.core.spi.MessageBroker messageBroker,
5455
MessageConverter messageConverter,
5556
MessageHeaders messageHeaders) {
56-
this(messageTemplate, messageConverter, messageHeaders, new UuidV4RqueueMessageIdGenerator());
57+
this(
58+
messageTemplate,
59+
messageBroker,
60+
messageConverter,
61+
messageHeaders,
62+
new UuidV4RqueueMessageIdGenerator());
5763
}
5864

5965
public RqueueEndpointManagerImpl(
6066
RqueueMessageTemplate messageTemplate,
67+
com.github.sonus21.rqueue.core.spi.MessageBroker messageBroker,
6168
MessageConverter messageConverter,
6269
MessageHeaders messageHeaders,
6370
RqueueMessageIdGenerator messageIdGenerator) {
64-
super(messageTemplate, messageConverter, messageHeaders, messageIdGenerator);
71+
super(messageTemplate, messageBroker, messageConverter, messageHeaders, messageIdGenerator);
6572
}
6673

6774
@Override

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageEnqueuerImpl.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
2828
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
2929
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
30+
import com.github.sonus21.rqueue.core.spi.MessageBroker;
3031
import com.github.sonus21.rqueue.utils.PriorityUtils;
3132
import java.util.Objects;
3233
import lombok.extern.slf4j.Slf4j;
@@ -38,17 +39,24 @@ public class RqueueMessageEnqueuerImpl extends BaseMessageSender implements Rque
3839

3940
public RqueueMessageEnqueuerImpl(
4041
RqueueMessageTemplate messageTemplate,
42+
MessageBroker messageBroker,
4143
MessageConverter messageConverter,
4244
MessageHeaders messageHeaders) {
43-
this(messageTemplate, messageConverter, messageHeaders, new UuidV4RqueueMessageIdGenerator());
45+
this(
46+
messageTemplate,
47+
messageBroker,
48+
messageConverter,
49+
messageHeaders,
50+
new UuidV4RqueueMessageIdGenerator());
4451
}
4552

4653
public RqueueMessageEnqueuerImpl(
4754
RqueueMessageTemplate messageTemplate,
55+
MessageBroker messageBroker,
4856
MessageConverter messageConverter,
4957
MessageHeaders messageHeaders,
5058
RqueueMessageIdGenerator messageIdGenerator) {
51-
super(messageTemplate, messageConverter, messageHeaders, messageIdGenerator);
59+
super(messageTemplate, messageBroker, messageConverter, messageHeaders, messageIdGenerator);
5260
}
5361

5462
private void validateBasic(String queue, Object message) {
@@ -114,18 +122,18 @@ public boolean enqueueWithPriority(
114122
* Routes priority-aware enqueues:
115123
*
116124
* <ul>
117-
* <li>Redis backend (default): uses the suffixed queue name
118-
* ({@code PriorityUtils.getQueueNameForPriority}). Priority is encoded in the queue name.
119-
* <li>Broker backend with non-primary-handler-dispatch (e.g. NATS): uses the original queue
120-
* name and passes the priority through to
125+
* <li>Redis-style backends (capabilities advertise {@code usesPrimaryHandlerDispatch}): uses
126+
* the suffixed queue name ({@code PriorityUtils.getQueueNameForPriority}). Priority is
127+
* encoded in the queue name; the broker ignores the {@code priority} param.
128+
* <li>Backends with per-priority routing (e.g. NATS): uses the base queue name and passes the
129+
* priority through to
121130
* {@link com.github.sonus21.rqueue.core.spi.MessageBroker#enqueue(QueueDetail, String,
122-
* RqueueMessage)} so the broker can route to a per-priority destination (subject/stream).
131+
* RqueueMessage)} so the broker picks the per-priority destination (subject/stream).
123132
* </ul>
124133
*/
125134
private String pushMessageForPriority(
126135
String queueName, String priority, String messageId, Object message, Long delayMs) {
127-
com.github.sonus21.rqueue.core.spi.MessageBroker broker = messageTemplate.getMessageBroker();
128-
if (broker != null && !broker.capabilities().usesPrimaryHandlerDispatch()) {
136+
if (!messageBroker.capabilities().usesPrimaryHandlerDispatch()) {
129137
return pushMessage(queueName, priority, messageId, message, null, delayMs, false);
130138
}
131139
return pushMessage(

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageManagerImpl.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,26 @@
4343

4444
@Slf4j
4545
public class RqueueMessageManagerImpl extends BaseMessageSender implements RqueueMessageManager {
46-
47-
@Autowired
48-
private RqueueLockManager rqueueLockManager;
49-
5046
public RqueueMessageManagerImpl(
5147
RqueueMessageTemplate messageTemplate,
48+
com.github.sonus21.rqueue.core.spi.MessageBroker messageBroker,
5249
MessageConverter messageConverter,
5350
MessageHeaders messageHeaders) {
54-
this(messageTemplate, messageConverter, messageHeaders, new UuidV4RqueueMessageIdGenerator());
51+
this(
52+
messageTemplate,
53+
messageBroker,
54+
messageConverter,
55+
messageHeaders,
56+
new UuidV4RqueueMessageIdGenerator());
5557
}
5658

5759
public RqueueMessageManagerImpl(
5860
RqueueMessageTemplate messageTemplate,
61+
com.github.sonus21.rqueue.core.spi.MessageBroker messageBroker,
5962
MessageConverter messageConverter,
6063
MessageHeaders messageHeaders,
6164
RqueueMessageIdGenerator messageIdGenerator) {
62-
super(messageTemplate, messageConverter, messageHeaders, messageIdGenerator);
65+
super(messageTemplate, messageBroker, messageConverter, messageHeaders, messageIdGenerator);
6366
}
6467

6568
@Override

0 commit comments

Comments
 (0)