Skip to content

Commit

Permalink
Log tracking of ByteBuf require-release operations (moquette-io#869)
Browse files Browse the repository at this point in the history
Decorates require and release with logging and tag to track the chainof retaining of a buffer
  • Loading branch information
andsel authored Nov 17, 2024
1 parent 0d22c0d commit 82a54aa
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 53 deletions.
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/broker/InMemoryQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public boolean isEmpty() {
@Override
public void closeAndPurge() {
for (SessionRegistry.EnqueuedMessage msg : queue) {
msg.release();
Utils.release(msg, "in memory queue cleanup");
}
if (queueRepository != null) {
// clean up the queue from the repository
Expand Down
13 changes: 8 additions & 5 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {

// retain else msg is cleaned by the NewNettyMQTTHandler and is not available
// in execution by SessionEventLoop
msg.retain();
Utils.retain(msg, PostOffice.BT_PUB_IN);
switch (qos) {
case AT_MOST_ONCE:
return postOffice.routeCommand(clientId, "PUB QoS0", () -> {
Expand All @@ -692,10 +692,11 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
}
postOffice.receivedPublishQos0(this, username, clientId, msg, expiry);
return null;
}).ifFailed(msg::release);
}).ifFailed(() -> Utils.release(msg, PostOffice.BT_PUB_IN + " - failed"));
case AT_LEAST_ONCE:
if (!receivedQuota.hasFreeSlots()) {
LOG.warn("Client {} exceeded the quota {} processing QoS1, disconnecting it", clientId, receivedQuota);
Utils.release(msg, PostOffice.BT_PUB_IN + " - QoS1 exceeded quota");
brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED);
disconnectSession();
dropConnection();
Expand All @@ -712,10 +713,11 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
receivedQuota.releaseSlot();
});
return null;
}).ifFailed(msg::release);
}).ifFailed(() -> Utils.release(msg, PostOffice.BT_PUB_IN + " - failed"));
case EXACTLY_ONCE: {
if (!receivedQuota.hasFreeSlots()) {
LOG.warn("Client {} exceeded the quota {} processing QoS2, disconnecting it", clientId, receivedQuota);
Utils.release(msg, PostOffice.BT_PUB_IN + " - phase 1 QoS2 exceeded quota");
brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED);
disconnectSession();
dropConnection();
Expand All @@ -731,7 +733,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
return null;
});
if (!firstStepResult.isSuccess()) {
msg.release();
Utils.release(msg, PostOffice.BT_PUB_IN + " - failed");
LOG.trace("Failed to enqueue PUB QoS2 to session loop for {}", clientId);
return firstStepResult;
}
Expand Down Expand Up @@ -816,9 +818,10 @@ void sendIfWritableElseDrop(MqttMessage msg) {
LOG.debug("Sending message {} on the wire to {}", msg.fixedHeader().messageType(), getClientId());
// Sending to external, retain a duplicate. Just retain is not
// enough, since the receiver must have full control.
// Retain because the OutboundHandler does a release of the buffer.
Object retainedDup = msg;
if (msg instanceof ByteBufHolder) {
retainedDup = ((ByteBufHolder) msg).retainedDuplicate();
retainedDup = Utils.retainDuplicate((ByteBufHolder) msg, "mqtt connection send PUB");
}

ChannelFuture channelFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void decode(ChannelHandlerContext chc, BinaryWebSocketFrame frame, Lis
ByteBuf bb = frame.content();
// System.out.println("WebSocketFrameToByteBufDecoder decode - " +
// ByteBufUtil.hexDump(bb));
bb.retain();
Utils.retain(bb, "Websocket decoder");
out.add(bb);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void operationComplete(ChannelFuture future) {
}
});
} finally {
ReferenceCountUtil.release(msg);
Utils.release(msg, "payload in - channel handler, read");
}
}

Expand Down
38 changes: 23 additions & 15 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class PostOffice {

private static final String WILL_PUBLISHER = "will_publisher";
private static final String INTERNAL_PUBLISHER = "internal_publisher";
public static final String BT_ROUTE_TARGET = "Route to target session";
public static final String BT_PUB_IN = "PUB in";

/**
* Maps the failed packetID per clientId (id client source, id_packet) -> [id client target]
Expand Down Expand Up @@ -621,14 +623,14 @@ CompletableFuture<Void> receivedPublishQos0(MQTTConnection connection, String us
final Topic topic = new Topic(msg.variableHeader().topicName());
if (!authorizator.canWrite(topic, username, clientID)) {
LOG.error("client is not authorized to publish on topic: {}", topic);
ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, auth failed");
return CompletableFuture.completedFuture(null);
}

if (isPayloadFormatToValidate(msg)) {
if (!validatePayloadAsUTF8(msg)) {
LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS0)");
ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, invalid format");
connection.brokerDisconnect(MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID);
connection.disconnectSession();
connection.dropConnection();
Expand All @@ -639,7 +641,7 @@ CompletableFuture<Void> receivedPublishQos0(MQTTConnection connection, String us
final RoutingResults publishResult = publish2Subscribers(clientID, messageExpiry, msg);
if (publishResult.isAllFailed()) {
LOG.info("No one publish was successfully enqueued to session loops");
ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, can't forward to next session loop");
return CompletableFuture.completedFuture(null);
}

Expand All @@ -650,7 +652,7 @@ CompletableFuture<Void> receivedPublishQos0(MQTTConnection connection, String us
}

interceptor.notifyTopicPublished(msg, clientID, username);
ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok");
});
}

Expand All @@ -662,13 +664,13 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, String username, i
if (!topic.isValid()) {
LOG.warn("Invalid topic format, force close the connection");
connection.dropConnection();
ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, qos1 invalid topic");
return RoutingResults.preroutingError();
}
final String clientId = connection.getClientId();
if (!authorizator.canWrite(topic, username, clientId)) {
LOG.error("MQTT client: {} is not authorized to publish on topic: {}", clientId, topic);
ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, qos1 auth failed");
return RoutingResults.preroutingError();
}

Expand All @@ -677,15 +679,15 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, String username, i
LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS1)");
connection.sendPubAck(messageID, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID);

ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, qos1 invalid format");
return RoutingResults.preroutingError();
}
}

if (isContentTypeToValidate(msg)) {
if (!validateContentTypeAsUTF8(msg)) {
LOG.warn("Received not valid UTF-8 content type (QoS1)");
ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, qos1 invalid content type");
connection.brokerDisconnect(MqttReasonCodes.Disconnect.PROTOCOL_ERROR);
connection.disconnectSession();
connection.dropConnection();
Expand Down Expand Up @@ -713,7 +715,7 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, String username, i
// some session event loop enqueue raised a problem
failedPublishes.insertAll(messageID, clientId, routes.failedRoutings);
}
ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, qos1");

// cleanup success resends from the failed publishes cache
failedPublishes.removeAll(messageID, clientId, routes.successedRoutings);
Expand Down Expand Up @@ -871,12 +873,18 @@ private RoutingResults publish2Subscribers(String publisherClientId,
LOG.trace("No matching subscriptions for topic: {}", topic);
return new RoutingResults(Collections.emptyList(), Collections.emptyList(), CompletableFuture.completedFuture(null));
}
// sanity check
if (subscriptionCount > sessionLoops.getEventLoopCount()) {
LOG.error("Cardinality of subscription batches ({}) is bigger then the available session loops {}",
subscriptionCount, sessionLoops.getEventLoopCount());
return new RoutingResults(Collections.emptyList(), Collections.emptyList(), CompletableFuture.completedFuture(null));
}

msg.retain(subscriptionCount);
Utils.retain(msg, subscriptionCount, BT_ROUTE_TARGET);

List<RouteResult> publishResults = collector.routeBatchedPublishes((batch) -> {
publishToSession(topic, batch, publishingQos, retainPublish, messageExpiry, msg);
msg.release();
Utils.release(msg, BT_ROUTE_TARGET);
});

final CompletableFuture[] publishFutures = publishResults.stream()
Expand All @@ -890,7 +898,7 @@ private RoutingResults publish2Subscribers(String publisherClientId,
Collection<String> subscibersIds = collector.subscriberIdsByEventLoop(rr.clientId);
if (rr.status == RouteResult.Status.FAIL) {
failedRoutings.addAll(subscibersIds);
msg.release();
Utils.release(msg, BT_ROUTE_TARGET + "- failed routing");
} else {
successedRoutings.addAll(subscibersIds);
}
Expand Down Expand Up @@ -970,7 +978,7 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage
final String clientId = connection.getClientId();
if (!authorizator.canWrite(topic, username, clientId)) {
LOG.error("MQTT client is not authorized to publish on topic: {}", topic);
ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, phase 2 qos2 auth failed");
// WARN this is a special case failed is empty, but this result is to be considered as error.
return RoutingResults.preroutingError();
}
Expand All @@ -981,7 +989,7 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage
LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS2)");
connection.sendPubRec(messageID, MqttReasonCodes.PubRec.PAYLOAD_FORMAT_INVALID);

ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, phase 2 qos2 invalid format");
return RoutingResults.preroutingError();
}
}
Expand All @@ -1002,7 +1010,7 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage
// some session event loop enqueue raised a problem
failedPublishes.insertAll(messageID, clientId, publishRoutings.failedRoutings);
}
ReferenceCountUtil.release(msg);
Utils.release(msg,PostOffice.BT_PUB_IN + " - ok, phase 2 qos2");

// cleanup success resends from the failed publishes cache
failedPublishes.removeAll(messageID, clientId, publishRoutings.successedRoutings);
Expand Down
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ public RoutingResults internalPublish(MqttPublishMessage msg, final String clien
}
LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
final RoutingResults routingResults = dispatcher.internalPublish(msg);
msg.payload().release();
Utils.release(msg, "Routing - internal PUB");
return routingResults;
}

Expand Down
22 changes: 11 additions & 11 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void processPubRec(int pubRecPacketId) {
LOG.warn("Received a PUBREC with not matching packetId");
return;
}
removed.release();
Utils.release(removed, "target session - phase 1 Qos2 pull from inflight");
if (removed instanceof SessionRegistry.PubRelMarker) {
LOG.info("Received a PUBREC for packetId that was already moved in second step of Qos2");
return;
Expand Down Expand Up @@ -229,7 +229,7 @@ public void processPubComp(int messageID) {
LOG.warn("Received a PUBCOMP with not matching packetId in the inflight cache");
return;
}
removed.release();
Utils.release(removed, "target session - phase 2 Qos2 pull from inflight");
mqttConnection.sendQuota().releaseSlot();
drainQueueToConnection();

Expand Down Expand Up @@ -302,7 +302,7 @@ private void sendPublishQos2(PublishedMessage publishRequest) {
private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnectionRef,
PublishedMessage publishRequest) {
// retain the payload because it's going to be added to map or to the queue.
publishRequest.retain();
Utils.retain(publishRequest, "target session - forward to inflight or queue");

if (canSkipQueue(localMqttConnectionRef)) {
mqttConnection.sendQuota().consumeSlot();
Expand All @@ -313,7 +313,7 @@ private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnect
EnqueuedMessage old = inflightWindow.put(packetId, publishRequest);
// If there already was something, release it.
if (old != null) {
old.release();
Utils.release(old, "target session - replace existing slot");
mqttConnection.sendQuota().releaseSlot();
}
if (resendInflightOnTimeout) {
Expand Down Expand Up @@ -356,7 +356,7 @@ void pubAckReceived(int ackPacketId) {
ackPacketId, inflightWindow.keySet());
return;
}
removed.release();
Utils.release(removed, "target session - inflight remove");

mqttConnection.sendQuota().releaseSlot();
LOG.debug("Received PUBACK {} for session {}", ackPacketId, getClientID());
Expand Down Expand Up @@ -476,7 +476,7 @@ private void drainQueueToConnection() {
// Putting it in a map, but the retain is cancelled out by the below release.
EnqueuedMessage old = inflightWindow.put(sendPacketId, msg);
if (old != null) {
old.release();
Utils.release(old, "target session - drain queue push to inflight");
mqttConnection.sendQuota().releaseSlot();
}
if (resendInflightOnTimeout) {
Expand Down Expand Up @@ -515,19 +515,19 @@ public void reconnectSession() {

public void receivedPublishQos2(int messageID, MqttPublishMessage msg) {
// Retain before putting msg in map.
ReferenceCountUtil.retain(msg);
Utils.retain(msg, "phase 2 qos2");

MqttPublishMessage old = qos2Receiving.put(messageID, msg);
// In case of evil client with duplicate msgid.
ReferenceCountUtil.release(old);
Utils.release(old, "phase 2 qos2 - packet id duplicated");

// mqttConnection.sendPublishReceived(messageID);
}

public void receivedPubRelQos2(int messageID) {
// Done with the message, remove from queue and release payload.
final MqttPublishMessage removedMsg = qos2Receiving.remove(messageID);
ReferenceCountUtil.release(removedMsg);
Utils.release(removedMsg, "phase 2 qos2");
}

Optional<InetSocketAddress> remoteAddress() {
Expand All @@ -543,10 +543,10 @@ public void cleanUp() {
sessionQueue.closeAndPurge();
inflightTimeouts.clear();
for (EnqueuedMessage msg : inflightWindow.values()) {
msg.release();
Utils.release(msg, "session cleanup - inflight window");
}
for (MqttPublishMessage msg : qos2Receiving.values()) {
msg.release();
Utils.release(msg, "session cleanup - phase 2 cache");
}
}

Expand Down
14 changes: 7 additions & 7 deletions broker/src/main/java/io/moquette/broker/SessionEventLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,27 @@ final class SessionEventLoop extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(SessionEventLoop.class);

private final BlockingQueue<FutureTask<String>> sessionQueue;
private final BlockingQueue<FutureTask<String>> taskQueue;
private final boolean flushOnExit;

public SessionEventLoop(BlockingQueue<FutureTask<String>> sessionQueue) {
this(sessionQueue, true);
public SessionEventLoop(BlockingQueue<FutureTask<String>> taskQueue) {
this(taskQueue, true);
}

/**
* @param flushOnExit consume the commands queue before exit.
* */
public SessionEventLoop(BlockingQueue<FutureTask<String>> sessionQueue, boolean flushOnExit) {
this.sessionQueue = sessionQueue;
public SessionEventLoop(BlockingQueue<FutureTask<String>> taskQueue, boolean flushOnExit) {
this.taskQueue = taskQueue;
this.flushOnExit = flushOnExit;
}

@Override
public void run() {
while (!Thread.interrupted() || (Thread.interrupted() && !sessionQueue.isEmpty() && flushOnExit)) {
while (!Thread.interrupted() || (Thread.interrupted() && !taskQueue.isEmpty() && flushOnExit)) {
try {
// blocking call
final FutureTask<String> task = this.sessionQueue.take();
final FutureTask<String> task = this.taskQueue.take();
executeTask(task);
} catch (InterruptedException e) {
LOG.info("SessionEventLoop {} interrupted", Thread.currentThread().getName());
Expand Down
Loading

0 comments on commit 82a54aa

Please sign in to comment.