Skip to content

Commit

Permalink
Added .mapConnectionAborted at RSocketClientChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Jun 18, 2021
1 parent e7d0b09 commit 072fe55
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.scalecube.services.exceptions;

import java.util.regex.Pattern;

public class ConnectionClosedException extends InternalServiceException {

private static final Pattern GENERIC_CONNECTION_CLOSED =
Pattern.compile(
"^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$",
Pattern.CASE_INSENSITIVE);

public ConnectionClosedException() {
super("Connection closed");
}

public ConnectionClosedException(Throwable cause) {
super(cause);
}

public ConnectionClosedException(String message) {
super(message);
}

/**
* Returns {@code true} if connection has been aborted on a tcp level by verifying error message
* and matching it against predefined pattern.
*
* @param th error
* @return {@code true} if connection has been aborted on a tcp level
*/
public static boolean isConnectionClosed(Throwable th) {
if (th instanceof ConnectionClosedException) {
return true;
}

final String message = th != null ? th.getMessage() : null;

return message != null && GENERIC_CONNECTION_CLOSED.matcher(message).matches();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ public InternalServiceException(int errorCode, String message) {
public InternalServiceException(Throwable cause) {
super(ERROR_TYPE, cause);
}

public InternalServiceException(String message) {
super(ERROR_TYPE, message);
}

public InternalServiceException(String message, Throwable cause) {
super(ERROR_TYPE, message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
package io.scalecube.services.exceptions;

public class MessageCodecException extends RuntimeException {
public class MessageCodecException extends InternalServiceException {

public MessageCodecException(String message, Throwable cause) {
super(message, cause);
}

@Override
public synchronized Throwable fillInStackTrace() {
return this;
}

@Override
public String toString() {
return getClass().getSimpleName() + "{errorMessage=" + getMessage() + '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.services.exceptions;

import java.util.StringJoiner;

public abstract class ServiceException extends RuntimeException {

private final int errorCode;
Expand All @@ -14,6 +16,11 @@ public ServiceException(int errorCode, Throwable cause) {
this.errorCode = errorCode;
}

public ServiceException(int errorCode, String message, Throwable cause) {
super(message, cause);
this.errorCode = errorCode;
}

@Override
public synchronized Throwable fillInStackTrace() {
return this;
Expand All @@ -25,11 +32,9 @@ public int errorCode() {

@Override
public String toString() {
return getClass().getSimpleName()
+ "{errorCode="
+ errorCode
+ ", errorMessage="
+ getMessage()
+ '}';
return new StringJoiner(", ", getClass().getSimpleName() + "[", "]")
.add("errorCode=" + errorCode)
.add("errorMessage='" + getMessage() + "'")
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.transport.api.ClientChannel;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -32,7 +33,7 @@ public Mono<ServiceMessage> requestResponse(ServiceMessage message, Type respons
.flatMap(rsocket -> rsocket.requestResponse(toPayload(message)))
.map(this::toMessage)
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType))
.doOnError(RSocketClientChannel::handleConnectionReset);
.onErrorMap(RSocketClientChannel::mapConnectionAborted);
}

@Override
Expand All @@ -41,7 +42,7 @@ public Flux<ServiceMessage> requestStream(ServiceMessage message, Type responseT
.flatMapMany(rsocket -> rsocket.requestStream(toPayload(message)))
.map(this::toMessage)
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType))
.doOnError(RSocketClientChannel::handleConnectionReset);
.onErrorMap(RSocketClientChannel::mapConnectionAborted);
}

@Override
Expand All @@ -51,7 +52,7 @@ public Flux<ServiceMessage> requestChannel(
.flatMapMany(rsocket -> rsocket.requestChannel(Flux.from(publisher).map(this::toPayload)))
.map(this::toMessage)
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType))
.doOnError(RSocketClientChannel::handleConnectionReset);
.onErrorMap(RSocketClientChannel::mapConnectionAborted);
}

private Payload toPayload(ServiceMessage request) {
Expand All @@ -66,11 +67,9 @@ private ServiceMessage toMessage(Payload payload) {
}
}

private static void handleConnectionReset(Throwable throwable) {
if (AbortedException.isConnectionReset(throwable)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[requestResponse] Connection has been reset");
}
}
private static Throwable mapConnectionAborted(Throwable t) {
return AbortedException.isConnectionReset(t) || ConnectionClosedException.isConnectionClosed(t)
? new ConnectionClosedException(t)
: t;
}
}

0 comments on commit 072fe55

Please sign in to comment.