Skip to content

Commit

Permalink
Free memory for MqttPublishes.receive on timeout/interrupt
Browse files Browse the repository at this point in the history
  • Loading branch information
SgtSilvio committed Nov 14, 2023
1 parent c070dec commit 9cbf830
Showing 1 changed file with 53 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.hivemq.client.internal.mqtt.util.MqttChecks;
import com.hivemq.client.internal.util.AsyncRuntimeException;
import com.hivemq.client.internal.util.Checks;
import com.hivemq.client.internal.util.collections.NodeList;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException;
Expand All @@ -51,7 +52,6 @@
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscription;

import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -219,7 +219,7 @@ public void disconnect(final @NotNull Mqtt5Disconnect disconnect) {
private static class MqttPublishes implements Mqtt5Publishes, FlowableSubscriber<Mqtt5Publish> {

private final @NotNull AtomicReference<@Nullable Subscription> subscription = new AtomicReference<>();
private final @NotNull LinkedList<Entry> entries = new LinkedList<>();
private final @NotNull NodeList<Entry> entries = new NodeList<>();
private @Nullable Mqtt5Publish queuedPublish;
private @Nullable Throwable error;

Expand Down Expand Up @@ -248,16 +248,15 @@ public void onNext(final @NotNull Mqtt5Publish publish) {
if (error != null) {
return;
}
Entry entry;
while ((entry = entries.poll()) != null) {
final boolean success = entry.result.compareAndSet(null, publish);
final Entry entry = entries.getFirst();
if (entry == null) {
queuedPublish = publish;
} else {
entries.remove(entry);
entry.result = publish;
entry.latch.countDown();
if (success) {
request();
return;
}
request();
}
queuedPublish = publish;
}
}

Expand All @@ -273,9 +272,9 @@ public void onError(final @NotNull Throwable t) {
return;
}
error = t;
Entry entry;
while ((entry = entries.poll()) != null) {
entry.result.set(t);
for (Entry entry = entries.getFirst(); entry != null; entry = entry.getNext()) {
entries.remove(entry);
entry.result = t;
entry.latch.countDown();
}
}
Expand All @@ -293,26 +292,27 @@ public void onError(final @NotNull Throwable t) {
return publish;
}
entry = new Entry();
entries.offer(entry);
entries.add(entry);
}

InterruptedException interruptedException = null;
Object result;
try {
entry.latch.await();
result = entry.result;
assert (result instanceof Mqtt5Publish) || (result instanceof Throwable);
} catch (final InterruptedException e) {
interruptedException = e;
result = tryCancel(entry, e);
}
final Object result = entry.result.getAndSet(Entry.CANCELLED);
if (result instanceof Mqtt5Publish) {
return (Mqtt5Publish) result;
}
if (result instanceof Throwable) {
if (result instanceof InterruptedException) {
throw (InterruptedException) result;
}
throw handleError((Throwable) result);
}
if (interruptedException != null) {
throw interruptedException;
}
throw new InterruptedException();
throw new IllegalStateException("This must not happen and is a bug.");
}

@Override
Expand All @@ -334,25 +334,29 @@ public void onError(final @NotNull Throwable t) {
return Optional.of(publish);
}
entry = new Entry();
entries.offer(entry);
entries.add(entry);
}

InterruptedException interruptedException = null;
Object result;
try {
entry.latch.await(timeout, timeUnit);
if (entry.latch.await(timeout, timeUnit)) {
result = entry.result;
assert (result instanceof Mqtt5Publish) || (result instanceof Throwable);
} else {
result = tryCancel(entry, null);
}
} catch (final InterruptedException e) {
interruptedException = e;
result = tryCancel(entry, e);
}
final Object result = entry.result.getAndSet(Entry.CANCELLED);
if (result instanceof Mqtt5Publish) {
return Optional.of((Mqtt5Publish) result);
}
if (result instanceof Throwable) {
if (result instanceof InterruptedException) {
throw (InterruptedException) result;
}
throw handleError((Throwable) result);
}
if (interruptedException != null) {
throw interruptedException;
}
return Optional.empty();
}

Expand All @@ -369,13 +373,25 @@ public void onError(final @NotNull Throwable t) {
}

private @Nullable Mqtt5Publish receiveNowUnsafe() {
final Mqtt5Publish queuedPublish = this.queuedPublish;
if (queuedPublish != null) {
final Mqtt5Publish queuedPublish = this.queuedPublish;
this.queuedPublish = null;
request();
return queuedPublish;
}
return null;
return queuedPublish;
}

private @Nullable Object tryCancel(final @NotNull Entry entry, final @Nullable Object resultOnCancel) {
synchronized (entries) {
final Object result = entry.result;
if (result == null) {
entries.remove(entry);
return resultOnCancel;
} else {
assert (result instanceof Mqtt5Publish) || (result instanceof Throwable);
return result;
}
}
}

@Override
Expand All @@ -389,9 +405,9 @@ public void close() {
return;
}
error = new CancellationException();
Entry entry;
while ((entry = entries.poll()) != null) {
entry.result.set(error);
for (Entry entry = entries.getFirst(); entry != null; entry = entry.getNext()) {
entries.remove(entry);
entry.result = error;
entry.latch.countDown();
}
}
Expand All @@ -404,12 +420,10 @@ public void close() {
throw new RuntimeException(t);
}

private static class Entry {

static final @NotNull Object CANCELLED = new Object();
private static class Entry extends NodeList.Node<Entry> {

final @NotNull CountDownLatch latch = new CountDownLatch(1);
final @NotNull AtomicReference<@Nullable Object> result = new AtomicReference<>();
@Nullable Object result = null;
}
}
}

0 comments on commit 9cbf830

Please sign in to comment.