From e7d0b0982ccd9f2b2f24df847656f200adc0e2eb Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Fri, 18 Jun 2021 19:31:11 +0300 Subject: [PATCH 1/3] Added .handleConnectionReset at RSocketClientChannel --- .../rsocket/RSocketClientChannel.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java index 0e056ed32..3c1fc8cd7 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java @@ -8,11 +8,16 @@ import io.scalecube.services.transport.api.ServiceMessageCodec; import java.lang.reflect.Type; import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.channel.AbortedException; public class RSocketClientChannel implements ClientChannel { + private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientChannel.class); + private final Mono rsocket; private final ServiceMessageCodec messageCodec; @@ -26,7 +31,8 @@ public Mono requestResponse(ServiceMessage message, Type respons return rsocket .flatMap(rsocket -> rsocket.requestResponse(toPayload(message))) .map(this::toMessage) - .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)); + .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) + .doOnError(RSocketClientChannel::handleConnectionReset); } @Override @@ -34,7 +40,8 @@ public Flux requestStream(ServiceMessage message, Type responseT return rsocket .flatMapMany(rsocket -> rsocket.requestStream(toPayload(message))) .map(this::toMessage) - .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)); + .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) + .doOnError(RSocketClientChannel::handleConnectionReset); } @Override @@ -43,7 +50,8 @@ public Flux requestChannel( return rsocket .flatMapMany(rsocket -> rsocket.requestChannel(Flux.from(publisher).map(this::toPayload))) .map(this::toMessage) - .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)); + .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) + .doOnError(RSocketClientChannel::handleConnectionReset); } private Payload toPayload(ServiceMessage request) { @@ -57,4 +65,12 @@ private ServiceMessage toMessage(Payload payload) { payload.release(); } } + + private static void handleConnectionReset(Throwable throwable) { + if (AbortedException.isConnectionReset(throwable)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[requestResponse] Connection has been reset"); + } + } + } } From 072fe55d857a9aaf145f60f320e76ea58c1f7069 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Fri, 18 Jun 2021 20:46:29 +0300 Subject: [PATCH 2/3] Added .mapConnectionAborted at RSocketClientChannel --- .../exceptions/ConnectionClosedException.java | 40 +++++++++++++++++++ .../exceptions/InternalServiceException.java | 8 ++++ .../exceptions/MessageCodecException.java | 12 +----- .../services/exceptions/ServiceException.java | 17 +++++--- .../rsocket/RSocketClientChannel.java | 17 ++++---- 5 files changed, 68 insertions(+), 26 deletions(-) create mode 100644 services-api/src/main/java/io/scalecube/services/exceptions/ConnectionClosedException.java diff --git a/services-api/src/main/java/io/scalecube/services/exceptions/ConnectionClosedException.java b/services-api/src/main/java/io/scalecube/services/exceptions/ConnectionClosedException.java new file mode 100644 index 000000000..36363fee2 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/exceptions/ConnectionClosedException.java @@ -0,0 +1,40 @@ +package io.scalecube.services.exceptions; + +import java.util.regex.Pattern; + +public class ConnectionClosedException extends InternalServiceException { + + private static final Pattern GENERIC_CONNECTION_CLOSED = + Pattern.compile( + "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", + Pattern.CASE_INSENSITIVE); + + public ConnectionClosedException() { + super("Connection closed"); + } + + public ConnectionClosedException(Throwable cause) { + super(cause); + } + + public ConnectionClosedException(String message) { + super(message); + } + + /** + * Returns {@code true} if connection has been aborted on a tcp level by verifying error message + * and matching it against predefined pattern. + * + * @param th error + * @return {@code true} if connection has been aborted on a tcp level + */ + public static boolean isConnectionClosed(Throwable th) { + if (th instanceof ConnectionClosedException) { + return true; + } + + final String message = th != null ? th.getMessage() : null; + + return message != null && GENERIC_CONNECTION_CLOSED.matcher(message).matches(); + } +} diff --git a/services-api/src/main/java/io/scalecube/services/exceptions/InternalServiceException.java b/services-api/src/main/java/io/scalecube/services/exceptions/InternalServiceException.java index a441c63ee..f5b33763f 100644 --- a/services-api/src/main/java/io/scalecube/services/exceptions/InternalServiceException.java +++ b/services-api/src/main/java/io/scalecube/services/exceptions/InternalServiceException.java @@ -11,4 +11,12 @@ public InternalServiceException(int errorCode, String message) { public InternalServiceException(Throwable cause) { super(ERROR_TYPE, cause); } + + public InternalServiceException(String message) { + super(ERROR_TYPE, message); + } + + public InternalServiceException(String message, Throwable cause) { + super(ERROR_TYPE, message, cause); + } } diff --git a/services-api/src/main/java/io/scalecube/services/exceptions/MessageCodecException.java b/services-api/src/main/java/io/scalecube/services/exceptions/MessageCodecException.java index 5340ea8af..11dfd8301 100644 --- a/services-api/src/main/java/io/scalecube/services/exceptions/MessageCodecException.java +++ b/services-api/src/main/java/io/scalecube/services/exceptions/MessageCodecException.java @@ -1,18 +1,8 @@ package io.scalecube.services.exceptions; -public class MessageCodecException extends RuntimeException { +public class MessageCodecException extends InternalServiceException { public MessageCodecException(String message, Throwable cause) { super(message, cause); } - - @Override - public synchronized Throwable fillInStackTrace() { - return this; - } - - @Override - public String toString() { - return getClass().getSimpleName() + "{errorMessage=" + getMessage() + '}'; - } } diff --git a/services-api/src/main/java/io/scalecube/services/exceptions/ServiceException.java b/services-api/src/main/java/io/scalecube/services/exceptions/ServiceException.java index fe0fc964e..fab12a16d 100644 --- a/services-api/src/main/java/io/scalecube/services/exceptions/ServiceException.java +++ b/services-api/src/main/java/io/scalecube/services/exceptions/ServiceException.java @@ -1,5 +1,7 @@ package io.scalecube.services.exceptions; +import java.util.StringJoiner; + public abstract class ServiceException extends RuntimeException { private final int errorCode; @@ -14,6 +16,11 @@ public ServiceException(int errorCode, Throwable cause) { this.errorCode = errorCode; } + public ServiceException(int errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = errorCode; + } + @Override public synchronized Throwable fillInStackTrace() { return this; @@ -25,11 +32,9 @@ public int errorCode() { @Override public String toString() { - return getClass().getSimpleName() - + "{errorCode=" - + errorCode - + ", errorMessage=" - + getMessage() - + '}'; + return new StringJoiner(", ", getClass().getSimpleName() + "[", "]") + .add("errorCode=" + errorCode) + .add("errorMessage='" + getMessage() + "'") + .toString(); } } diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java index 3c1fc8cd7..221225fed 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java @@ -4,6 +4,7 @@ import io.rsocket.RSocket; import io.rsocket.util.ByteBufPayload; import io.scalecube.services.api.ServiceMessage; +import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.transport.api.ClientChannel; import io.scalecube.services.transport.api.ServiceMessageCodec; import java.lang.reflect.Type; @@ -32,7 +33,7 @@ public Mono requestResponse(ServiceMessage message, Type respons .flatMap(rsocket -> rsocket.requestResponse(toPayload(message))) .map(this::toMessage) .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) - .doOnError(RSocketClientChannel::handleConnectionReset); + .onErrorMap(RSocketClientChannel::mapConnectionAborted); } @Override @@ -41,7 +42,7 @@ public Flux requestStream(ServiceMessage message, Type responseT .flatMapMany(rsocket -> rsocket.requestStream(toPayload(message))) .map(this::toMessage) .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) - .doOnError(RSocketClientChannel::handleConnectionReset); + .onErrorMap(RSocketClientChannel::mapConnectionAborted); } @Override @@ -51,7 +52,7 @@ public Flux requestChannel( .flatMapMany(rsocket -> rsocket.requestChannel(Flux.from(publisher).map(this::toPayload))) .map(this::toMessage) .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) - .doOnError(RSocketClientChannel::handleConnectionReset); + .onErrorMap(RSocketClientChannel::mapConnectionAborted); } private Payload toPayload(ServiceMessage request) { @@ -66,11 +67,9 @@ private ServiceMessage toMessage(Payload payload) { } } - private static void handleConnectionReset(Throwable throwable) { - if (AbortedException.isConnectionReset(throwable)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("[requestResponse] Connection has been reset"); - } - } + private static Throwable mapConnectionAborted(Throwable t) { + return AbortedException.isConnectionReset(t) || ConnectionClosedException.isConnectionClosed(t) + ? new ConnectionClosedException(t) + : t; } } From 73ef688521323084c7026a8c3730e9cfac186312 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Fri, 18 Jun 2021 21:08:18 +0300 Subject: [PATCH 3/3] Added back Hooks.onErrorDropped for AbortedException and ConnectionClosedException --- .../rsocket/RSocketServiceTransport.java | 19 +++++++++++++++++++ .../rsocket/RSocketServiceTransportTest.java | 8 ++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java index 8f47fa129..6b27e5ad6 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java @@ -8,6 +8,7 @@ import io.netty.util.concurrent.Future; import io.scalecube.services.auth.Authenticator; import io.scalecube.services.auth.CredentialsSupplier; +import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.methods.ServiceMethodRegistry; import io.scalecube.services.transport.api.ClientTransport; import io.scalecube.services.transport.api.DataCodec; @@ -18,13 +19,31 @@ import java.util.StringJoiner; import java.util.concurrent.ThreadFactory; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.netty.FutureMono; +import reactor.netty.channel.AbortedException; import reactor.netty.resources.LoopResources; public class RSocketServiceTransport implements ServiceTransport { + public static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceTransport.class); + + static { + Hooks.onErrorDropped( + t -> { + if (AbortedException.isConnectionReset(t) + || ConnectionClosedException.isConnectionClosed(t)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Connection aborted: {}", t.toString()); + } + } + }); + } + private int numOfWorkers = Runtime.getRuntime().availableProcessors(); private HeadersCodec headersCodec = HeadersCodec.DEFAULT_INSTANCE; diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index 29304a41e..e1a31afc2 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -10,9 +10,9 @@ import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; +import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; -import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -94,7 +94,7 @@ public void test_remote_node_died_mono_never() throws Exception { TimeUnit.MILLISECONDS.sleep(100); assertEquals(0, latch1.getCount()); - assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass()); + assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } @@ -122,7 +122,7 @@ public void test_remote_node_died_many_never() throws Exception { TimeUnit.MILLISECONDS.sleep(100); assertEquals(0, latch1.getCount()); - assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass()); + assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } @@ -154,7 +154,7 @@ public void test_remote_node_died_many_then_never() throws Exception { TimeUnit.MILLISECONDS.sleep(100); assertEquals(0, latch1.getCount()); - assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass()); + assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } }