From e71d360599d7c1320afc81cc77ca692a30bbf838 Mon Sep 17 00:00:00 2001 From: Silvio Giebl Date: Mon, 13 Nov 2023 22:30:08 +0100 Subject: [PATCH] Free memory for MqttPublishes.receive on timeout/interrupt --- .../internal/mqtt/MqttBlockingClient.java | 92 +++++++++++-------- 1 file changed, 53 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/hivemq/client2/internal/mqtt/MqttBlockingClient.java b/src/main/java/com/hivemq/client2/internal/mqtt/MqttBlockingClient.java index 6b2b57310..dbf12dd68 100644 --- a/src/main/java/com/hivemq/client2/internal/mqtt/MqttBlockingClient.java +++ b/src/main/java/com/hivemq/client2/internal/mqtt/MqttBlockingClient.java @@ -30,6 +30,7 @@ import com.hivemq.client2.internal.mqtt.util.MqttChecks; import com.hivemq.client2.internal.util.AsyncRuntimeException; import com.hivemq.client2.internal.util.Checks; +import com.hivemq.client2.internal.util.collections.NodeList; import com.hivemq.client2.mqtt.MqttGlobalPublishFilter; import com.hivemq.client2.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client2.mqtt.mqtt5.exceptions.Mqtt5SubAckException; @@ -51,7 +52,6 @@ import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscription; -import java.util.LinkedList; import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; @@ -219,7 +219,7 @@ public void disconnect(final @NotNull Mqtt5Disconnect disconnect) { private static class Publishes implements Mqtt5BlockingClient.Publishes, FlowableSubscriber { private final @NotNull AtomicReference<@Nullable Subscription> subscription = new AtomicReference<>(); - private final @NotNull LinkedList entries = new LinkedList<>(); + private final @NotNull NodeList entries = new NodeList<>(); private @Nullable Mqtt5Publish queuedPublish; private @Nullable Throwable error; @@ -248,16 +248,15 @@ public void onNext(final @NotNull Mqtt5Publish publish) { if (error != null) { return; } - Entry entry; - while ((entry = entries.poll()) != null) { - final boolean success = entry.result.compareAndSet(null, publish); + final Entry entry = entries.getFirst(); + if (entry == null) { + queuedPublish = publish; + } else { + entries.remove(entry); + entry.result = publish; entry.latch.countDown(); - if (success) { - request(); - return; - } + request(); } - queuedPublish = publish; } } @@ -273,9 +272,9 @@ public void onError(final @NotNull Throwable t) { return; } error = t; - Entry entry; - while ((entry = entries.poll()) != null) { - entry.result.set(t); + for (Entry entry = entries.getFirst(); entry != null; entry = entry.getNext()) { + entries.remove(entry); + entry.result = t; entry.latch.countDown(); } } @@ -293,26 +292,27 @@ public void onError(final @NotNull Throwable t) { return publish; } entry = new Entry(); - entries.offer(entry); + entries.add(entry); } - InterruptedException interruptedException = null; + Object result; try { entry.latch.await(); + result = entry.result; + assert (result instanceof Mqtt5Publish) || (result instanceof Throwable); } catch (final InterruptedException e) { - interruptedException = e; + result = tryCancel(entry, e); } - final Object result = entry.result.getAndSet(Entry.CANCELLED); if (result instanceof Mqtt5Publish) { return (Mqtt5Publish) result; } if (result instanceof Throwable) { + if (result instanceof InterruptedException) { + throw (InterruptedException) result; + } throw handleError((Throwable) result); } - if (interruptedException != null) { - throw interruptedException; - } - throw new InterruptedException(); + throw new IllegalStateException("This must not happen and is a bug."); } @Override @@ -331,25 +331,29 @@ public void onError(final @NotNull Throwable t) { return Optional.of(publish); } entry = new Entry(); - entries.offer(entry); + entries.add(entry); } - InterruptedException interruptedException = null; + Object result; try { - entry.latch.await(timeout, timeUnit); + if (entry.latch.await(timeout, timeUnit)) { + result = entry.result; + assert (result instanceof Mqtt5Publish) || (result instanceof Throwable); + } else { + result = tryCancel(entry, null); + } } catch (final InterruptedException e) { - interruptedException = e; + result = tryCancel(entry, e); } - final Object result = entry.result.getAndSet(Entry.CANCELLED); if (result instanceof Mqtt5Publish) { return Optional.of((Mqtt5Publish) result); } if (result instanceof Throwable) { + if (result instanceof InterruptedException) { + throw (InterruptedException) result; + } throw handleError((Throwable) result); } - if (interruptedException != null) { - throw interruptedException; - } return Optional.empty(); } @@ -366,13 +370,25 @@ public void onError(final @NotNull Throwable t) { } private @Nullable Mqtt5Publish receiveNowUnsafe() { + final Mqtt5Publish queuedPublish = this.queuedPublish; if (queuedPublish != null) { - final Mqtt5Publish queuedPublish = this.queuedPublish; this.queuedPublish = null; request(); - return queuedPublish; } - return null; + return queuedPublish; + } + + private @Nullable Object tryCancel(final @NotNull Entry entry, final @Nullable Object resultOnCancel) { + synchronized (entries) { + final Object result = entry.result; + if (result == null) { + entries.remove(entry); + return resultOnCancel; + } else { + assert (result instanceof Mqtt5Publish) || (result instanceof Throwable); + return result; + } + } } @Override @@ -386,9 +402,9 @@ public void close() { return; } error = new CancellationException(); - Entry entry; - while ((entry = entries.poll()) != null) { - entry.result.set(error); + for (Entry entry = entries.getFirst(); entry != null; entry = entry.getNext()) { + entries.remove(entry); + entry.result = error; entry.latch.countDown(); } } @@ -401,12 +417,10 @@ public void close() { throw new RuntimeException(t); } - private static class Entry { - - static final @NotNull Object CANCELLED = new Object(); + private static class Entry extends NodeList.Node { final @NotNull CountDownLatch latch = new CountDownLatch(1); - final @NotNull AtomicReference<@Nullable Object> result = new AtomicReference<>(); + @Nullable Object result = null; } } }