From 73ef688521323084c7026a8c3730e9cfac186312 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Fri, 18 Jun 2021 21:08:18 +0300 Subject: [PATCH] Added back Hooks.onErrorDropped for AbortedException and ConnectionClosedException --- .../rsocket/RSocketServiceTransport.java | 19 +++++++++++++++++++ .../rsocket/RSocketServiceTransportTest.java | 8 ++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java index 8f47fa129..6b27e5ad6 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java @@ -8,6 +8,7 @@ import io.netty.util.concurrent.Future; import io.scalecube.services.auth.Authenticator; import io.scalecube.services.auth.CredentialsSupplier; +import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.methods.ServiceMethodRegistry; import io.scalecube.services.transport.api.ClientTransport; import io.scalecube.services.transport.api.DataCodec; @@ -18,13 +19,31 @@ import java.util.StringJoiner; import java.util.concurrent.ThreadFactory; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.netty.FutureMono; +import reactor.netty.channel.AbortedException; import reactor.netty.resources.LoopResources; public class RSocketServiceTransport implements ServiceTransport { + public static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceTransport.class); + + static { + Hooks.onErrorDropped( + t -> { + if (AbortedException.isConnectionReset(t) + || ConnectionClosedException.isConnectionClosed(t)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Connection aborted: {}", t.toString()); + } + } + }); + } + private int numOfWorkers = Runtime.getRuntime().availableProcessors(); private HeadersCodec headersCodec = HeadersCodec.DEFAULT_INSTANCE; diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index 29304a41e..e1a31afc2 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -10,9 +10,9 @@ import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; +import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; -import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -94,7 +94,7 @@ public void test_remote_node_died_mono_never() throws Exception { TimeUnit.MILLISECONDS.sleep(100); assertEquals(0, latch1.getCount()); - assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass()); + assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } @@ -122,7 +122,7 @@ public void test_remote_node_died_many_never() throws Exception { TimeUnit.MILLISECONDS.sleep(100); assertEquals(0, latch1.getCount()); - assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass()); + assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } @@ -154,7 +154,7 @@ public void test_remote_node_died_many_then_never() throws Exception { TimeUnit.MILLISECONDS.sleep(100); assertEquals(0, latch1.getCount()); - assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass()); + assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } }