Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Mqtt3Client does not release threads created by publishes() when disconnected #633

Open
ASunc opened this issue Jun 18, 2024 · 13 comments
Labels

Comments

@ASunc
Copy link

ASunc commented Jun 18, 2024

🐛 Bug Report

🔬 How To Reproduce

Steps to reproduce the behavior:

  1. ... create client by Mqtt3Client.builder()......buildAsync()
  2. call client.publishes(MqttGlobalPublishFilter.REMAINING, ....) // thread com.hivemq.client.mqtt-1-1 is created
  3. call client.connectIWth()....
  4. call client.disconnect().get() // thread is not released

Code sample

public class HiveTest {

public static void main(String[] args) throws InterruptedException, ExecutionException
{
	var test = new HiveTest();

	for (int i = 0; i < 10; i++) {
		var cli = test.connect();

		Thread.sleep(2 * 1000L);

		test.disconnect(cli);
	}

	System.out.println("Sleeping... com.hivemq.client.mqtt-n-n threads are still there.");
	Thread.sleep(300 * 1000L);
}

private void disconnect(Mqtt3AsyncClient cli) throws InterruptedException, ExecutionException
{
	cli.disconnect().get();
}

public Mqtt3AsyncClient connect() throws InterruptedException, ExecutionException
{
	var client = Mqtt3Client.builder()
			.identifier("myid")
			.serverHost("broker")
			.serverPort(1883)
			.addDisconnectedListener(ctx -> {

				System.err.println("MQTT disconnected from broker: " + ctx.getCause());
			})
			.addConnectedListener(ctx -> {

				System.out.println("MQTT connected to broker");
			})
			.buildAsync();

	client.publishes(MqttGlobalPublishFilter.REMAINING, pub -> {

		try {

			String payload = new String(pub.getPayloadAsBytes(), "UTF-8");
			System.out.println("MESSAGE:" + pub.getTopic().toString() + ":" + payload);
		}
		catch (UnsupportedEncodingException e) {

			System.err.println("Bad payload encoding.");
		}
	});

	client.connectWith().cleanSession(false).keepAlive(60).send().thenCompose(connAck -> {

		return client.subscribeWith()
				.addSubscription(Mqtt3Subscription.builder().topicFilter("test").build())
				.send();

	}).get();

	return client;
}

}

Environment

Where are you running/using this client? Windows 11

Hardware or Device? Dell Laptop

What version of this client are you using? 1.3.3

JVM version? 11.0.22.7

Operating System? Windows 11

Which MQTT protocol version is being used? 3

Which MQTT broker (name and version)? mosquitto 2.0.18

Screenshots

📈 Expected behavior

When disconnecting, I would expect that thread would be terminated. Don't confuse this
thread with various RxComputation threads, I know that number of those are capped.

📎 Additional context

@ASunc ASunc added the bug label Jun 18, 2024
@ViToni
Copy link

ViToni commented Aug 26, 2024

I've observed the same behaviour.

When one closes the connection "manually" (USER) as in

/**
* The user explicitly called disconnect.
*/
USER,

the thread(s) are not stopped and one ends up with multiple com.hivemq.client.mqtt-*-* threads.

To me it seems as the culprit is the logic within:

if (disconnectEvent instanceof MqttDisconnectEvent.ByUser) {
final MqttDisconnectEvent.ByUser disconnectEventByUser = (MqttDisconnectEvent.ByUser) disconnectEvent;
ctx.writeAndFlush(disconnect).addListener(f -> {
if (f.isSuccess()) {
((DuplexChannel) channel).shutdownOutput().addListener(cf -> {
if (cf.isSuccess()) {
state = new DisconnectingState(channel, disconnectEventByUser);
} else {
disconnected(channel, disconnectEvent);
disconnectEventByUser.getFlow().onError(new ConnectionClosedException(cf.cause()));
}
});
} else {
disconnected(channel, disconnectEvent);
disconnectEventByUser.getFlow().onError(new ConnectionClosedException(f.cause()));
}
});
} else if (clientConfig.getMqttVersion() == MqttVersion.MQTT_5_0) {

as channel.close() is not called when the disconnect source is USER.

I might be wrong though as I've seen channel.close() being called in other places as well (so maybe the USER use case is handled elsewhere) and I'm not deep into this code base.

@ViToni
Copy link

ViToni commented Aug 26, 2024

In our case this is especially bad, since we have now two threads trying to reconnect, so each thread disconnects the other one when connection is established succesfully...

@SgtSilvio
Copy link
Member

I think this PR was created trying to fix this issue: #638
Added a comment there why changing to daemon threads is very likely not the proper solution: #638 (comment)

This is a behavioral change.
A simple example: if you have a main function that publishes a message via the MQTT client asynchronously, and you don’t block the main thread until the message is fully published/acknowledged, and the MQTT client threads are all daemon thread, the process will exit before successfully sending the message.

Regarding the issue:
The MQTT client's threads should stop automatically when not needed anymore.
If I remember correctly, the threads are not released while the client still has a session. This might be the cause for your issue. We should rather take a look if this can be improved.

@SgtSilvio
Copy link
Member

@ViToni

as channel.close() is not called when the disconnect source is USER.

This is not what is causing this issue because the channel is actually closed, only later:
either here:

public void channelInactive(final @NotNull ChannelHandlerContext ctx) {

or here:

So you are right about:

I might be wrong though as I've seen channel.close() being called in other places as well (so maybe the USERuse case is handled elsewhere)

@SgtSilvio
Copy link
Member

SgtSilvio commented Sep 4, 2024

In our case this is especially bad, since we have now two threads trying to reconnect, so each thread disconnects the other one when connection is established succesfully...

This sounds strange, threads do not reconnect or disconnect any connections. Do you mean two MQTT client instances (instead of threads) that reconnect, one in a not-terminated process and one in a newly started process?

@SgtSilvio
Copy link
Member

@ASunc Your issue is probably that we keep the threads alive as long as the client has a session (if I remember this correctly). With MQTT 3 cleanSession=false means the session never expires. This might need to be improved.

@ASunc
Copy link
Author

ASunc commented Sep 4, 2024

Maybe, I'm indeed using cleanSession=false. Still, the behavior this way is odd, as when the client is disconnected one would expect that everything is released. Using cleanSession=true is not a valid choice for me, because that would loose all the messages buffered by broker during reconnect. So, I agree that there is certainly room for improvement here.

@SgtSilvio
Copy link
Member

Using cleanSession=true is not a valid choice for me

To avoid misunderstandings, I did not mean that your usage of cleanSession needs to be improved. Instead I mean that the handling of releasing the threads when using cleanSession=false and callbacks needs to be improved.

Declaring threads as daemon threads (as proposed by @ViToni) is not a proper fix for that, but rather a workaround. It might change the behavior in a breaking way for other users of the library.
If daemon threads are fine for your case (as an intermediate fix), you can apply this workaround without needing a change in this library. Add the following code to your client builder:

import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.ThreadPerTaskExecutor;

...
    .executorConfig()
        .nettyExecutor(new ThreadPerTaskExecutor(new DefaultThreadFactory(
                "com.hivemq.client.mqtt.daemon", true, Thread.MAX_PRIORITY)))
        .applyExecutorConfig()`
...

@ASunc
Copy link
Author

ASunc commented Sep 4, 2024

Using daemon threads won't help me. Everything runs under application server and it is kept running unless there are software updates. If any connection drops and reconnects occur during this time the daemon threads will be laying around just like normal threads.

@SgtSilvio
Copy link
Member

If the application keeps running, then this should not be an issue, because the number of netty threads is bounded by the number of available processors (times 2). The number of netty threads is decoupled from how many MQTT clients you create. An MQTT client instance does not exclusively hold a thread, the threads are shared.

Reading the following from the issue description, I think you might think that the number of com.hivemq.client.mqtt threads is not capped:

Don't confuse this thread with various RxComputation threads, I know that number of those are capped.

Do you see more of these threads than expected (available processors times 2)?

@ASunc
Copy link
Author

ASunc commented Sep 4, 2024

Excellent! I have 24 logical processors according to windows task manager and indeed it seems that there are no more than 48 of those threads. The amount is just so high that I never tested things that far. This means that there is no major problem after all.

@ViToni
Copy link

ViToni commented Sep 25, 2024

In our case this is especially bad, since we have now two threads trying to reconnect, so each thread disconnects the other one when connection is established succesfully...

This sounds strange, threads do not reconnect or disconnect any connections. Do you mean two MQTT client instances (instead of threads) that reconnect, one in a not-terminated process and one in a newly started process?

Please bear with me, If I used the wrong wording.
From our logs I saw that there are two MQTT client threads running:

com.hivemq.client.mqtt-1-1
com.hivemq.client.mqtt-1-2

which concurrently try to establish a connection.

This happens when we try to close the current session and try to establish a new one with a changed configuration (and we can't use cleanSession=false as well, but we are operating in embedded space... so no 24 logical processors for us).
I'll do a follow up with the code in question. I don't mean to hijack this issue, I can open a dedicated one if that suits better.

@ViToni
Copy link

ViToni commented Sep 25, 2024

Declaring threads as daemon threads (as proposed by @ViToni) is not a proper fix for that, but rather a workaround. It might change the behavior in a breaking way for other users of the library. If daemon threads are fine for your case (as an intermediate fix), you can apply this workaround without needing a change in this library. Add the following code to your client builder:

It was not meant as a fix but rather as a means to reduce impact of the created threads. I'd rather love to see it more fool proof to use so that it would be more difficult on my side to use it in an unexpected way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants