You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I noticed that if my client connection gets closed while waiting for a publish to finish, it will never run the callback. In publishAsync, you don't specify the callback, the library does, but even a custom callback using publish has the same issue.
The reason I ran into this is because my broker (Aedes) has some custom authz logic and whenever a publish is denied, I noticed that my await publishAsync(...) calls never complete -- they just hang. It's because the callback is never getting called. Aedes disconnects your client when a publish is denied. VerneMQ does the same thing, so I see the same issue there if the ACL denies my publish. I tested with Mosquitto and as far as I can tell Mosquitto doesn't actually disconnect your client if it denies a publish -- it just logs it but acknowledges the publish with a PUBACK like everything worked properly.
Looking in the code, it looks like the callback gets registered as part of a data structure in the this.outgoing object with volatilate set to false. Then, in the close event listener there is a call to _flushVolatile() which runs callbacks registered in that same this.outgoing object, however it only runs them if the volatile field is true, and once again, we can see in the link above that publishes for QoS 1 and 2 set volatile as false. Now, there is another method _flush() that basically does the same thing, but for everything in this.outgoing, not just those set as volatile. I played around with calling that in the close listener instead of _flushVolatile() and it did fix my issue, but who knows what else it would break to do that.
I was going to open a PR for a change regarding this, but to be honest, I don't truly understand the intention behind using _flush() vs _flushVolatile(), nor do I have a good sense of what effects it would have in other areas. But it does look to me like the code is setup in a way that suggests we want the callbacks to run with the connection closed error passed in when a connection closes mid-publish.
Minimal Reproduction
If you run this code you should see that your code seemingly hangs at the await call to publishAsync. No rejected promise, no resolve call happens, 'Message Sent' never logs, nothing. You can also change it to publish with a callback and you should see that your callback never runs. You have to use QoS 1 or 2. The issue doesn't occur with QoS 0 which doesn't register with this.outgoing, but passes it's callback directly into _sendPacket:
importaedesfrom'aedes';import{connectAsync}from'mqtt';importnetfrom'node:net';constbroker=newaedes({authorizePublish: (client,packet,callback)=>{callback(newError('Unauthorized'));},});constserver=net.createServer(broker.handle);server.listen(1883,()=>{console.log('Broker is running on port 1883');});constclient=awaitconnectAsync({host: 'localhost',port: 1883,});client.on('close',()=>{console.log('Client closed');});awaitclient.publishAsync('test-topic','Hello, world!',{qos: 1,});console.log('Message sent');
Debug logs
It technically loops these because the client keeps reconnecting, so here are the logs until the first time the client is closed. I know I can disable the reconnect behavior, but still:
mqttjs connecting to an MQTT broker... +0ms
mqttjs:client MqttClient :: version: 5.10.4 +0ms
mqttjs:client MqttClient :: environment node +0ms
mqttjs:client MqttClient :: options.protocol mqtt +0ms
mqttjs:client MqttClient :: options.protocolVersion 4 +0ms
mqttjs:client MqttClient :: options.username undefined +0ms
mqttjs:client MqttClient :: options.keepalive 60 +0ms
mqttjs:client MqttClient :: options.reconnectPeriod 1000 +1ms
mqttjs:client MqttClient :: options.rejectUnauthorized undefined +0ms
mqttjs:client MqttClient :: options.properties.topicAliasMaximum undefined +0ms
mqttjs:client MqttClient :: clientId mqttjs_894bfc55 +0ms
mqttjs:client MqttClient :: setting up stream +0ms
mqttjs:client connect :: calling method to clear reconnect +0ms
mqttjs:client _clearReconnect : clearing reconnect timer +0ms
mqttjs:client connect :: using streamBuilder provided to client to create stream +0ms
mqttjs calling streambuilder for mqtt +4ms
mqttjs:tcp port 1883 and host localhost +0ms
mqttjs:client connect :: pipe stream to writable stream +4ms
mqttjs:client connect: sending packet `connect` +1ms
mqttjs:client _writePacket :: packet: {
mqttjs:client cmd: 'connect',
mqttjs:client protocolId: 'MQTT',
mqttjs:client protocolVersion: 4,
mqttjs:client clean: true,
mqttjs:client clientId: 'mqttjs_894bfc55',
mqttjs:client keepalive: 60,
mqttjs:client username: undefined,
mqttjs:client password: undefined,
mqttjs:client properties: undefined
mqttjs:client } +0ms
mqttjs:client _writePacket :: emitting `packetsend` +0ms
mqttjs:client _writePacket :: writing to stream +0ms
mqttjs:client _writePacket :: writeToStream result true +7ms
Broker is running on port 1883
mqttjs:client writable stream :: parsing buffer +2ms
mqttjs:client parser :: on packet push to packets array. +1ms
mqttjs:client work :: getting next packet in queue +0ms
mqttjs:client work :: packet pulled from queue +0ms
mqttjs:client _handlePacket :: emitting packetreceive +0ms
mqttjs:client _handleConnack +0ms
mqttjs:client _setupKeepaliveManager :: keepalive 60 (seconds) +0ms
mqttjs:client KeepaliveManager: set keepalive to 60000ms +0ms
mqttjs:client connect :: sending queued packets +0ms
mqttjs:client deliver :: entry undefined +0ms
mqttjs:client _resubscribe +0ms
mqttjs:client publish :: message `Hello, world!` to topic `test-topic` +0ms
mqttjs:client publish :: qos 1 +0ms
mqttjs:client MqttClient:publish: packet cmd: publish +1ms
mqttjs:client _sendPacket :: (mqttjs_894bfc55) :: start +0ms
mqttjs:client storeAndSend :: store packet with cmd publish to outgoingStore +0ms
mqttjs:client _removeTopicAliasAndRecoverTopicName :: alias NaN, topic 'test-topic' +0ms
mqttjs:client noop :: undefined +0ms
mqttjs:client _writePacket :: packet: {
mqttjs:client cmd: 'publish',
mqttjs:client topic: 'test-topic',
mqttjs:client payload: 'Hello, world!',
mqttjs:client qos: 1,
mqttjs:client retain: false,
mqttjs:client messageId: 21936,
mqttjs:client dup: false
mqttjs:client } +0ms
mqttjs:client _writePacket :: emitting `packetsend` +0ms
mqttjs:client _writePacket :: writing to stream +0ms
mqttjs:client _writePacket :: writeToStream result true +0ms
mqttjs:client _writePacket :: invoking cb +0ms
mqttjs:client noop :: undefined +0ms
mqttjs:client _sendPacket :: (mqttjs_894bfc55) :: end +0ms
mqttjs:client (mqttjs_894bfc55)stream :: on close +5ms
mqttjs:client _flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function +0ms
mqttjs:client stream: emit close to MqttClient +0ms
mqttjs:client close :: connected set to `false` +0ms
mqttjs:client close :: clearing connackTimer +0ms
mqttjs:client _destroyKeepaliveManager :: destroying keepalive manager +0ms
mqttjs:client close :: calling _setupReconnect +0ms
mqttjs:client _setupReconnect :: emit `offline` state +1ms
mqttjs:client _setupReconnect :: set `reconnecting` to `true` +0ms
mqttjs:client _setupReconnect :: setting reconnectTimer for 1000 ms +0ms
Client closed
The text was updated successfully, but these errors were encountered:
MQTTjs Version
5.10.4
Broker
Aedes, VerneMQ
Environment
NodeJS
Description
I noticed that if my client connection gets closed while waiting for a publish to finish, it will never run the callback. In
publishAsync
, you don't specify the callback, the library does, but even a custom callback usingpublish
has the same issue.The reason I ran into this is because my broker (Aedes) has some custom authz logic and whenever a publish is denied, I noticed that my
await publishAsync(...)
calls never complete -- they just hang. It's because the callback is never getting called. Aedes disconnects your client when a publish is denied. VerneMQ does the same thing, so I see the same issue there if the ACL denies my publish. I tested with Mosquitto and as far as I can tell Mosquitto doesn't actually disconnect your client if it denies a publish -- it just logs it but acknowledges the publish with aPUBACK
like everything worked properly.Looking in the code, it looks like the callback gets registered as part of a data structure in the
this.outgoing
object withvolatilate
set tofalse
. Then, in theclose
event listener there is a call to_flushVolatile()
which runs callbacks registered in that samethis.outgoing
object, however it only runs them if thevolatile
field istrue
, and once again, we can see in the link above that publishes for QoS 1 and 2 setvolatile
asfalse
. Now, there is another method_flush()
that basically does the same thing, but for everything inthis.outgoing
, not just those set asvolatile
. I played around with calling that in theclose
listener instead of_flushVolatile()
and it did fix my issue, but who knows what else it would break to do that.I was going to open a PR for a change regarding this, but to be honest, I don't truly understand the intention behind using
_flush()
vs_flushVolatile()
, nor do I have a good sense of what effects it would have in other areas. But it does look to me like the code is setup in a way that suggests we want the callbacks to run with theconnection closed
error passed in when a connection closes mid-publish.Minimal Reproduction
If you run this code you should see that your code seemingly hangs at the
await
call topublishAsync
. No rejected promise, no resolve call happens, 'Message Sent' never logs, nothing. You can also change it topublish
with a callback and you should see that your callback never runs. You have to use QoS 1 or 2. The issue doesn't occur with QoS 0 which doesn't register withthis.outgoing
, but passes it's callback directly into_sendPacket
:Debug logs
It technically loops these because the client keeps reconnecting, so here are the logs until the first time the client is closed. I know I can disable the reconnect behavior, but still:
The text was updated successfully, but these errors were encountered: