From 9c467ac66ed655c55eece01328a03f85b9b30979 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 17 Jan 2019 11:57:00 +0800 Subject: [PATCH] close session when it's not responding for a while --- .../pegasus/rpc/async/ReplicaSession.java | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index f8d485f3..8c90bdf5 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Created by weijiesun on 17-9-13. @@ -62,6 +63,8 @@ public void initChannel(SocketChannel ch) { pipeline.addLast("ClientHandler", new ReplicaSession.DefaultHandler()); } }); + + this.firstRecentTimedOutMs = new AtomicLong(0); } // You can specify a message response filter with constructor or with "setMessageResponseFilter" function. @@ -71,6 +74,7 @@ public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTi this(address, rpcGroup, socketTimeout); this.filter = filter; } + public void setMessageResponseFilter(MessageResponseFilter filter) { this.filter = filter; } @@ -220,8 +224,20 @@ private void tryNotifyWithSequenceID( entry.timeoutTask.cancel(true); entry.op.rpc_error.errno = errno; entry.callback.run(); - } - else { + + if (errno == error_types.ERR_TIMEOUT) { + long firstTs = firstRecentTimedOutMs.get(); + if (firstTs == 0) { + firstRecentTimedOutMs.set(System.currentTimeMillis()); + } else if (System.currentTimeMillis() - firstTs >= sessionResetTimeWindowMs) { + logger.warn("{}: actively close the session because it's not responding for {} seconds", + name(), sessionResetTimeWindowMs); + closeSession(); + } + } else { + firstRecentTimedOutMs.set(0); + } + } else { logger.warn("{}: {} is removed by others, current error {}, isTimeoutTask {}", name(), seqID, errno.toString(), isTimeoutTask); } @@ -305,11 +321,15 @@ private final static class VolatileFields { public ConnState state = ConnState.DISCONNECTED; public Channel nettyChannel = null; } + private volatile VolatileFields fields = new VolatileFields(); - private rpc_address address; + private final rpc_address address; private Bootstrap boot; private EventLoopGroup rpcGroup; + private AtomicLong firstRecentTimedOutMs; + private static final long sessionResetTimeWindowMs = 10 * 1000; // 10s + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSession.class); }