Skip to content

Commit

Permalink
chore: better error reporting for ConnectionWatchDog
Browse files Browse the repository at this point in the history
  • Loading branch information
okg-cxf committed Aug 8, 2024
1 parent 19e6463 commit 2058db3
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 33 deletions.
71 changes: 39 additions & 32 deletions src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionBuilder;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.RedisException;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ReconnectAttemptEvent;
import io.lettuce.core.event.connection.ReconnectFailedEvent;
Expand Down Expand Up @@ -215,20 +216,11 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channel = null;

if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
return;
}

if (!isListenOnChannelInactive()) {
logger.debug("Skip reconnect scheduling, listener disabled");
return;
}

if (!useAutoBatchFlushEndpoint) {
this.scheduleReconnect();
} else {
doReconnectOnAutoBatchFlushEndpointQuiescence = this::scheduleReconnect;
}
doReconnectOnAutoBatchFlushEndpointQuiescence = this::scheduleReconnect;
// otherwise, will be called later by BatchFlushEndpoint#onEndpointQuiescence
} else {
logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
Expand All @@ -237,7 +229,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}

boolean willReconnect() {
boolean willReconnectOnAutoBatchFlushEndpointQuiescence() {
return doReconnectOnAutoBatchFlushEndpointQuiescence != null;
}

Expand All @@ -261,14 +253,16 @@ public void scheduleReconnect() {
logger.debug("{} scheduleReconnect()", logPrefix());

if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
notifyEndpointFailedToConnectIfNeeded();
final String errMsg = "isEventLoopGroupActive() == false";
logger.debug(errMsg);
notifyEndpointFailedToConnectIfNeeded(errMsg);
return;
}

if (!isListenOnChannelInactive()) {
logger.debug("Skip reconnect scheduling, listener disabled");
notifyEndpointFailedToConnectIfNeeded();
final String errMsg = "Skip reconnect scheduling, listener disabled";
logger.debug(errMsg);
notifyEndpointFailedToConnectIfNeeded(errMsg);
return;
}

Expand All @@ -285,8 +279,9 @@ public void scheduleReconnect() {
reconnectScheduleTimeout = null;

if (!isEventLoopGroupActive()) {
logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");
notifyEndpointFailedToConnectIfNeeded();
final String errMsg = "Cannot execute scheduled reconnect timer, reconnect workers are terminated";
logger.warn(errMsg);
notifyEndpointFailedToConnectIfNeeded(errMsg);
return;
}

Expand All @@ -302,17 +297,25 @@ public void scheduleReconnect() {
}
} else {
logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
notifyEndpointFailedToConnectIfNeeded();
notifyEndpointFailedToConnectIfNeeded("Skipping scheduleReconnect() because I have an active channel");
}
}

private void notifyEndpointFailedToConnectIfNeeded(String msg) {
if (useAutoBatchFlushEndpoint) {
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(new RedisException(msg));
}
}

private void notifyEndpointFailedToConnectIfNeeded() {
notifyEndpointFailedToConnectIfNeeded(new CancellationException());
private void notifyEndpointFailedToConnectIfNeeded(Throwable t) {
if (useAutoBatchFlushEndpoint) {
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(t);
}
}

private void notifyEndpointFailedToConnectIfNeeded(Exception e) {
private void notifyEndpointFailedToConnectIfNeeded(Supplier<Throwable> throwableSupplier) {
if (useAutoBatchFlushEndpoint) {
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(e);
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(throwableSupplier.get());
}
}

Expand All @@ -335,26 +338,29 @@ public void run(int attempt) throws Exception {
* @param delay retry delay.
* @throws Exception when reconnection fails.
*/
private void run(int attempt, Duration delay) throws Exception {
private void run(int attempt, Duration delay) {

reconnectSchedulerSync.set(false);
reconnectScheduleTimeout = null;

if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
notifyEndpointFailedToConnectIfNeeded();
final String errMsg = "isEventLoopGroupActive() == false";
logger.debug(errMsg);
notifyEndpointFailedToConnectIfNeeded(errMsg);
return;
}

if (!isListenOnChannelInactive()) {
logger.debug("Skip reconnect scheduling, listener disabled");
notifyEndpointFailedToConnectIfNeeded();
final String errMsg = "Skip reconnect scheduling, listener disabled";
logger.debug(errMsg);
notifyEndpointFailedToConnectIfNeeded(errMsg);
return;
}

if (isReconnectSuspended()) {
logger.debug("Skip reconnect scheduling, reconnect is suspended");
notifyEndpointFailedToConnectIfNeeded();
final String msg = "Skip reconnect scheduling, reconnect is suspended";
logger.debug(msg);
notifyEndpointFailedToConnectIfNeeded(msg);
return;
}

Expand Down Expand Up @@ -411,7 +417,8 @@ private void run(int attempt, Duration delay) throws Exception {
if (!isReconnectSuspended()) {
scheduleReconnect();
} else {
notifyEndpointFailedToConnectIfNeeded();
notifyEndpointFailedToConnectIfNeeded(
() -> new RedisException("got error and then reconnect is suspended", t));
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ public void notifyChannelInactiveAfterWatchdogDecision(Channel channel,
return;
}

boolean willReconnect = connectionWatchdog != null && connectionWatchdog.willReconnect();
boolean willReconnect = connectionWatchdog != null
&& connectionWatchdog.willReconnectOnAutoBatchFlushEndpointQuiescence();
RedisException exception = null;
// Unlike DefaultEndpoint, here we don't check reliability since connectionWatchdog.willReconnect() already does it.
if (isClosed()) {
Expand Down

0 comments on commit 2058db3

Please sign in to comment.