diff --git a/services-api/src/main/java/io/scalecube/services/exceptions/ConnectionClosedException.java b/services-api/src/main/java/io/scalecube/services/exceptions/ConnectionClosedException.java new file mode 100644 index 000000000..36363fee2 --- /dev/null +++ b/services-api/src/main/java/io/scalecube/services/exceptions/ConnectionClosedException.java @@ -0,0 +1,40 @@ +package io.scalecube.services.exceptions; + +import java.util.regex.Pattern; + +public class ConnectionClosedException extends InternalServiceException { + + private static final Pattern GENERIC_CONNECTION_CLOSED = + Pattern.compile( + "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", + Pattern.CASE_INSENSITIVE); + + public ConnectionClosedException() { + super("Connection closed"); + } + + public ConnectionClosedException(Throwable cause) { + super(cause); + } + + public ConnectionClosedException(String message) { + super(message); + } + + /** + * Returns {@code true} if connection has been aborted on a tcp level by verifying error message + * and matching it against predefined pattern. + * + * @param th error + * @return {@code true} if connection has been aborted on a tcp level + */ + public static boolean isConnectionClosed(Throwable th) { + if (th instanceof ConnectionClosedException) { + return true; + } + + final String message = th != null ? th.getMessage() : null; + + return message != null && GENERIC_CONNECTION_CLOSED.matcher(message).matches(); + } +} diff --git a/services-api/src/main/java/io/scalecube/services/exceptions/InternalServiceException.java b/services-api/src/main/java/io/scalecube/services/exceptions/InternalServiceException.java index a441c63ee..f5b33763f 100644 --- a/services-api/src/main/java/io/scalecube/services/exceptions/InternalServiceException.java +++ b/services-api/src/main/java/io/scalecube/services/exceptions/InternalServiceException.java @@ -11,4 +11,12 @@ public InternalServiceException(int errorCode, String message) { public InternalServiceException(Throwable cause) { super(ERROR_TYPE, cause); } + + public InternalServiceException(String message) { + super(ERROR_TYPE, message); + } + + public InternalServiceException(String message, Throwable cause) { + super(ERROR_TYPE, message, cause); + } } diff --git a/services-api/src/main/java/io/scalecube/services/exceptions/MessageCodecException.java b/services-api/src/main/java/io/scalecube/services/exceptions/MessageCodecException.java index 5340ea8af..11dfd8301 100644 --- a/services-api/src/main/java/io/scalecube/services/exceptions/MessageCodecException.java +++ b/services-api/src/main/java/io/scalecube/services/exceptions/MessageCodecException.java @@ -1,18 +1,8 @@ package io.scalecube.services.exceptions; -public class MessageCodecException extends RuntimeException { +public class MessageCodecException extends InternalServiceException { public MessageCodecException(String message, Throwable cause) { super(message, cause); } - - @Override - public synchronized Throwable fillInStackTrace() { - return this; - } - - @Override - public String toString() { - return getClass().getSimpleName() + "{errorMessage=" + getMessage() + '}'; - } } diff --git a/services-api/src/main/java/io/scalecube/services/exceptions/ServiceException.java b/services-api/src/main/java/io/scalecube/services/exceptions/ServiceException.java index fe0fc964e..fab12a16d 100644 --- a/services-api/src/main/java/io/scalecube/services/exceptions/ServiceException.java +++ b/services-api/src/main/java/io/scalecube/services/exceptions/ServiceException.java @@ -1,5 +1,7 @@ package io.scalecube.services.exceptions; +import java.util.StringJoiner; + public abstract class ServiceException extends RuntimeException { private final int errorCode; @@ -14,6 +16,11 @@ public ServiceException(int errorCode, Throwable cause) { this.errorCode = errorCode; } + public ServiceException(int errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = errorCode; + } + @Override public synchronized Throwable fillInStackTrace() { return this; @@ -25,11 +32,9 @@ public int errorCode() { @Override public String toString() { - return getClass().getSimpleName() - + "{errorCode=" - + errorCode - + ", errorMessage=" - + getMessage() - + '}'; + return new StringJoiner(", ", getClass().getSimpleName() + "[", "]") + .add("errorCode=" + errorCode) + .add("errorMessage='" + getMessage() + "'") + .toString(); } } diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java index 0e056ed32..221225fed 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketClientChannel.java @@ -4,15 +4,21 @@ import io.rsocket.RSocket; import io.rsocket.util.ByteBufPayload; import io.scalecube.services.api.ServiceMessage; +import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.transport.api.ClientChannel; import io.scalecube.services.transport.api.ServiceMessageCodec; import java.lang.reflect.Type; import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.channel.AbortedException; public class RSocketClientChannel implements ClientChannel { + private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientChannel.class); + private final Mono rsocket; private final ServiceMessageCodec messageCodec; @@ -26,7 +32,8 @@ public Mono requestResponse(ServiceMessage message, Type respons return rsocket .flatMap(rsocket -> rsocket.requestResponse(toPayload(message))) .map(this::toMessage) - .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)); + .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) + .onErrorMap(RSocketClientChannel::mapConnectionAborted); } @Override @@ -34,7 +41,8 @@ public Flux requestStream(ServiceMessage message, Type responseT return rsocket .flatMapMany(rsocket -> rsocket.requestStream(toPayload(message))) .map(this::toMessage) - .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)); + .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) + .onErrorMap(RSocketClientChannel::mapConnectionAborted); } @Override @@ -43,7 +51,8 @@ public Flux requestChannel( return rsocket .flatMapMany(rsocket -> rsocket.requestChannel(Flux.from(publisher).map(this::toPayload))) .map(this::toMessage) - .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)); + .map(msg -> ServiceMessageCodec.decodeData(msg, responseType)) + .onErrorMap(RSocketClientChannel::mapConnectionAborted); } private Payload toPayload(ServiceMessage request) { @@ -57,4 +66,10 @@ private ServiceMessage toMessage(Payload payload) { payload.release(); } } + + private static Throwable mapConnectionAborted(Throwable t) { + return AbortedException.isConnectionReset(t) || ConnectionClosedException.isConnectionClosed(t) + ? new ConnectionClosedException(t) + : t; + } } diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java index 8f47fa129..6b27e5ad6 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java @@ -8,6 +8,7 @@ import io.netty.util.concurrent.Future; import io.scalecube.services.auth.Authenticator; import io.scalecube.services.auth.CredentialsSupplier; +import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.methods.ServiceMethodRegistry; import io.scalecube.services.transport.api.ClientTransport; import io.scalecube.services.transport.api.DataCodec; @@ -18,13 +19,31 @@ import java.util.StringJoiner; import java.util.concurrent.ThreadFactory; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.netty.FutureMono; +import reactor.netty.channel.AbortedException; import reactor.netty.resources.LoopResources; public class RSocketServiceTransport implements ServiceTransport { + public static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceTransport.class); + + static { + Hooks.onErrorDropped( + t -> { + if (AbortedException.isConnectionReset(t) + || ConnectionClosedException.isConnectionClosed(t)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Connection aborted: {}", t.toString()); + } + } + }); + } + private int numOfWorkers = Runtime.getRuntime().availableProcessors(); private HeadersCodec headersCodec = HeadersCodec.DEFAULT_INSTANCE; diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index 29304a41e..e1a31afc2 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -10,9 +10,9 @@ import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; +import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; -import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -94,7 +94,7 @@ public void test_remote_node_died_mono_never() throws Exception { TimeUnit.MILLISECONDS.sleep(100); assertEquals(0, latch1.getCount()); - assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass()); + assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } @@ -122,7 +122,7 @@ public void test_remote_node_died_many_never() throws Exception { TimeUnit.MILLISECONDS.sleep(100); assertEquals(0, latch1.getCount()); - assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass()); + assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } @@ -154,7 +154,7 @@ public void test_remote_node_died_many_then_never() throws Exception { TimeUnit.MILLISECONDS.sleep(100); assertEquals(0, latch1.getCount()); - assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass()); + assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } }