Skip to content

Commit

Permalink
[ISSUE #8599] Fix send fail with receiving GO_AWAY when rolling updat…
Browse files Browse the repository at this point in the history
…e proxy and add channel id in logs (#8685)
  • Loading branch information
qianye1001 committed Sep 13, 2024
1 parent 00d5047 commit 7748040
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.AbortProcessException;
Expand Down Expand Up @@ -393,7 +393,7 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm
responseFuture.release();
}
} else {
log.warn("receive response, cmd={}, but not matched any request, address={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn("receive response, cmd={}, but not matched any request, address={}, channelId={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), ctx.channel().id());
}
}

Expand Down Expand Up @@ -560,13 +560,13 @@ public void operationFail(Throwable throwable) {
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
log.warn("send a request command to channel <{}>, channelId={}, failed.", RemotingHelper.parseChannelRemoteAddr(channel), channel.id());
});
return future;
} catch (Exception e) {
responseTable.remove(opaque);
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
log.warn("send a request command to channel <{}> channelId={} Exception", RemotingHelper.parseChannelRemoteAddr(channel), channel.id(), e);
future.completeExceptionally(new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e));
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -416,14 +415,14 @@ public void closeChannel(final String addr, final Channel channel) {
boolean removeItemFromTable = true;
final ChannelWrapper prevCW = this.channelTables.get(addrRemote);

LOGGER.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
LOGGER.info("closeChannel: begin close the channel[addr={}, id={}] Found: {}", addrRemote, channel.id(), prevCW != null);

if (null == prevCW) {
LOGGER.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
LOGGER.info("closeChannel: the channel[addr={}, id={}] has been removed from the channel table before", addrRemote, channel.id());
removeItemFromTable = false;
} else if (prevCW.isWrapperOf(channel)) {
LOGGER.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
addrRemote);
LOGGER.info("closeChannel: the channel[addr={}, id={}] has been closed before, and has been created again, nothing to do.",
addrRemote, channel.id());
removeItemFromTable = false;
}

Expand All @@ -432,7 +431,7 @@ public void closeChannel(final String addr, final Channel channel) {
if (channelWrapper != null && channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
LOGGER.info("closeChannel: the channel[addr={}, id={}] was removed from channel table", addrRemote, channel.id());
}

RemotingHelper.closeChannel(channel);
Expand Down Expand Up @@ -471,7 +470,7 @@ public void closeChannel(final Channel channel) {
}

if (null == prevCW) {
LOGGER.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
LOGGER.info("eventCloseChannel: the channel[addr={}, id={}] has been removed from the channel table before", RemotingHelper.parseChannelRemoteAddr(channel), channel.id());
removeItemFromTable = false;
}

Expand All @@ -480,11 +479,11 @@ public void closeChannel(final Channel channel) {
if (channelWrapper != null && channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
LOGGER.info("closeChannel: the channel[addr={}, id={}] was removed from channel table", addrRemote, channel.id());
RemotingHelper.closeChannel(channel);
}
} catch (Exception e) {
LOGGER.error("closeChannel: close the channel exception", e);
LOGGER.error("closeChannel: close the channel[id={}] exception", channel.id(), e);
} finally {
this.lockChannelTables.unlock();
}
Expand Down Expand Up @@ -562,9 +561,9 @@ public RemotingCommand invokeSync(String addr, final RemotingCommand request, lo
boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis / 4;
if (nettyClientConfig.isClientCloseSocketIfTimeout() && shouldClose) {
this.closeChannel(addr, channel);
LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, channelRemoteAddr);
LOGGER.warn("invokeSync: close socket because of timeout, {}ms, channel[addr={}, id={}]", timeoutMillis, channelRemoteAddr, channel.id());
}
LOGGER.warn("invokeSync: wait response timeout exception, the channel[{}]", channelRemoteAddr);
LOGGER.warn("invokeSync: wait response timeout exception, the channel[addr={}, id={}]", channelRemoteAddr, channel.id());
throw e;
}
} else {
Expand Down Expand Up @@ -819,21 +818,23 @@ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final
RemotingCommand response = responseFuture.getResponseCommand();
if (response.getCode() == ResponseCode.GO_AWAY) {
if (nettyClientConfig.isEnableReconnectForGoAway()) {
LOGGER.info("Receive go away from channelId={}, channel={}", channel.id(), channel);
ChannelWrapper channelWrapper = channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
try {
if (channelWrapper0.reconnect()) {
LOGGER.info("Receive go away from channel {}, recreate the channel", channel0);
if (channelWrapper0.reconnect(channel0)) {
LOGGER.info("Receive go away from channelId={}, channel={}, recreate the channelId={}", channel0.id(), channel0, channelWrapper0.getChannel().id());
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
}
} catch (Throwable t) {
LOGGER.error("Channel {} reconnect error", channelWrapper0, t);
}
return channelWrapper0;
});
if (channelWrapper != null) {
if (channelWrapper != null && !channelWrapper.isWrapperOf(channel)) {
if (nettyClientConfig.isEnableTransparentRetry()) {
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
retryRequest.setBody(request.getBody());
retryRequest.setExtFields(request.getExtFields());
if (channelWrapper.isOK()) {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
Expand Down Expand Up @@ -865,6 +866,8 @@ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final
return future;
}
}
} else {
LOGGER.warn("invokeImpl receive GO_AWAY, channelWrapper is null or channel is the same in wrapper, channelId={}", channel.id());
}
}
}
Expand Down Expand Up @@ -1002,7 +1005,6 @@ class ChannelWrapper {
// only affected by sync or async request, oneway is not included.
private ChannelFuture channelToClose;
private long lastResponseTime;
private volatile long lastReconnectTimestamp = 0L;
private final String channelAddress;

public ChannelWrapper(String address, ChannelFuture channelFuture) {
Expand All @@ -1021,10 +1023,7 @@ public boolean isWritable() {
}

public boolean isWrapperOf(Channel channel) {
if (this.channelFuture.channel() != null && this.channelFuture.channel() == channel) {
return true;
}
return false;
return this.channelFuture.channel() != null && this.channelFuture.channel() == channel;
}

private Channel getChannel() {
Expand Down Expand Up @@ -1052,20 +1051,27 @@ public String getChannelAddress() {
return channelAddress;
}

public boolean reconnect() {
public boolean reconnect(Channel channel) {
if (!isWrapperOf(channel)) {
LOGGER.warn("channelWrapper has reconnect, so do nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
return false;
}
if (lock.writeLock().tryLock()) {
try {
if (lastReconnectTimestamp == 0L || System.currentTimeMillis() - lastReconnectTimestamp > Duration.ofSeconds(nettyClientConfig.getMaxReconnectIntervalTimeSeconds()).toMillis()) {
if (isWrapperOf(channel)) {
channelToClose = channelFuture;
String[] hostAndPort = getHostAndPort(channelAddress);
channelFuture = fetchBootstrap(channelAddress)
.connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
lastReconnectTimestamp = System.currentTimeMillis();
return true;
} else {
LOGGER.warn("channelWrapper has reconnect, so do nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
}
} finally {
lock.writeLock().unlock();
}
} else {
LOGGER.warn("channelWrapper reconnect try lock fail, now channelId={}", getChannel().id());
}
return false;
}
Expand Down Expand Up @@ -1152,7 +1158,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress);
LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}, channelId={}", remoteAddress, ctx.channel().id());
super.channelActive(ctx);

if (NettyRemotingClient.this.channelEventListener != null) {
Expand All @@ -1175,7 +1181,7 @@ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
LOGGER.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
LOGGER.info("NETTY CLIENT PIPELINE: CLOSE channel[addr={}, id={}]", remoteAddress, ctx.channel().id());
closeChannel(ctx.channel());
super.close(ctx, promise);
NettyRemotingClient.this.failFast(ctx.channel());
Expand All @@ -1187,7 +1193,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
LOGGER.info("NETTY CLIENT PIPELINE: channelInactive, the channel[{}]", remoteAddress);
LOGGER.info("NETTY CLIENT PIPELINE: channelInactive, the channel[addr={}, id={}]", remoteAddress, ctx.channel().id());
closeChannel(ctx.channel());
super.channelInactive(ctx);
}
Expand All @@ -1198,7 +1204,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception channel[addr={}, id={}]", remoteAddress, ctx.channel().id());
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
Expand All @@ -1213,8 +1219,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught channel[addr={}, id={}]", remoteAddress, ctx.channel().id(), cause);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
Expand Down

0 comments on commit 7748040

Please sign in to comment.