Skip to content

Commit

Permalink
Merge pull request #817 from scalecube/add-connection-reset-handler
Browse files Browse the repository at this point in the history
Made smarter error handler for aborted connection case
  • Loading branch information
artem-v committed Jun 18, 2021
2 parents 28fbeb9 + 73ef688 commit 662b3d9
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.scalecube.services.exceptions;

import java.util.regex.Pattern;

public class ConnectionClosedException extends InternalServiceException {

private static final Pattern GENERIC_CONNECTION_CLOSED =
Pattern.compile(
"^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$",
Pattern.CASE_INSENSITIVE);

public ConnectionClosedException() {
super("Connection closed");
}

public ConnectionClosedException(Throwable cause) {
super(cause);
}

public ConnectionClosedException(String message) {
super(message);
}

/**
* Returns {@code true} if connection has been aborted on a tcp level by verifying error message
* and matching it against predefined pattern.
*
* @param th error
* @return {@code true} if connection has been aborted on a tcp level
*/
public static boolean isConnectionClosed(Throwable th) {
if (th instanceof ConnectionClosedException) {
return true;
}

final String message = th != null ? th.getMessage() : null;

return message != null && GENERIC_CONNECTION_CLOSED.matcher(message).matches();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ public InternalServiceException(int errorCode, String message) {
public InternalServiceException(Throwable cause) {
super(ERROR_TYPE, cause);
}

public InternalServiceException(String message) {
super(ERROR_TYPE, message);
}

public InternalServiceException(String message, Throwable cause) {
super(ERROR_TYPE, message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
package io.scalecube.services.exceptions;

public class MessageCodecException extends RuntimeException {
public class MessageCodecException extends InternalServiceException {

public MessageCodecException(String message, Throwable cause) {
super(message, cause);
}

@Override
public synchronized Throwable fillInStackTrace() {
return this;
}

@Override
public String toString() {
return getClass().getSimpleName() + "{errorMessage=" + getMessage() + '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.services.exceptions;

import java.util.StringJoiner;

public abstract class ServiceException extends RuntimeException {

private final int errorCode;
Expand All @@ -14,6 +16,11 @@ public ServiceException(int errorCode, Throwable cause) {
this.errorCode = errorCode;
}

public ServiceException(int errorCode, String message, Throwable cause) {
super(message, cause);
this.errorCode = errorCode;
}

@Override
public synchronized Throwable fillInStackTrace() {
return this;
Expand All @@ -25,11 +32,9 @@ public int errorCode() {

@Override
public String toString() {
return getClass().getSimpleName()
+ "{errorCode="
+ errorCode
+ ", errorMessage="
+ getMessage()
+ '}';
return new StringJoiner(", ", getClass().getSimpleName() + "[", "]")
.add("errorCode=" + errorCode)
.add("errorMessage='" + getMessage() + "'")
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.transport.api.ClientChannel;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import java.lang.reflect.Type;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.channel.AbortedException;

public class RSocketClientChannel implements ClientChannel {

private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientChannel.class);

private final Mono<RSocket> rsocket;
private final ServiceMessageCodec messageCodec;

Expand All @@ -26,15 +32,17 @@ public Mono<ServiceMessage> requestResponse(ServiceMessage message, Type respons
return rsocket
.flatMap(rsocket -> rsocket.requestResponse(toPayload(message)))
.map(this::toMessage)
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType));
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType))
.onErrorMap(RSocketClientChannel::mapConnectionAborted);
}

@Override
public Flux<ServiceMessage> requestStream(ServiceMessage message, Type responseType) {
return rsocket
.flatMapMany(rsocket -> rsocket.requestStream(toPayload(message)))
.map(this::toMessage)
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType));
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType))
.onErrorMap(RSocketClientChannel::mapConnectionAborted);
}

@Override
Expand All @@ -43,7 +51,8 @@ public Flux<ServiceMessage> requestChannel(
return rsocket
.flatMapMany(rsocket -> rsocket.requestChannel(Flux.from(publisher).map(this::toPayload)))
.map(this::toMessage)
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType));
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType))
.onErrorMap(RSocketClientChannel::mapConnectionAborted);
}

private Payload toPayload(ServiceMessage request) {
Expand All @@ -57,4 +66,10 @@ private ServiceMessage toMessage(Payload payload) {
payload.release();
}
}

private static Throwable mapConnectionAborted(Throwable t) {
return AbortedException.isConnectionReset(t) || ConnectionClosedException.isConnectionClosed(t)
? new ConnectionClosedException(t)
: t;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.netty.util.concurrent.Future;
import io.scalecube.services.auth.Authenticator;
import io.scalecube.services.auth.CredentialsSupplier;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.DataCodec;
Expand All @@ -18,13 +19,31 @@
import java.util.StringJoiner;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.netty.FutureMono;
import reactor.netty.channel.AbortedException;
import reactor.netty.resources.LoopResources;

public class RSocketServiceTransport implements ServiceTransport {

public static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceTransport.class);

static {
Hooks.onErrorDropped(
t -> {
if (AbortedException.isConnectionReset(t)
|| ConnectionClosedException.isConnectionClosed(t)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Connection aborted: {}", t.toString());
}
}
});
}

private int numOfWorkers = Runtime.getRuntime().availableProcessors();

private HeadersCodec headersCodec = HeadersCodec.DEFAULT_INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.sut.QuoteService;
import io.scalecube.services.sut.SimpleQuoteService;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -94,7 +94,7 @@ public void test_remote_node_died_mono_never() throws Exception {
TimeUnit.MILLISECONDS.sleep(100);

assertEquals(0, latch1.getCount());
assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass());
assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
assertTrue(sub1.get().isDisposed());
}

Expand Down Expand Up @@ -122,7 +122,7 @@ public void test_remote_node_died_many_never() throws Exception {
TimeUnit.MILLISECONDS.sleep(100);

assertEquals(0, latch1.getCount());
assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass());
assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
assertTrue(sub1.get().isDisposed());
}

Expand Down Expand Up @@ -154,7 +154,7 @@ public void test_remote_node_died_many_then_never() throws Exception {
TimeUnit.MILLISECONDS.sleep(100);

assertEquals(0, latch1.getCount());
assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass());
assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
assertTrue(sub1.get().isDisposed());
}
}

0 comments on commit 662b3d9

Please sign in to comment.