Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
fix: prevent session to be actively closed when it gets response from…
Browse files Browse the repository at this point in the history
… server (#86)
  • Loading branch information
Wu Tao authored and foreverneverer committed Jan 6, 2020
1 parent 43636af commit a0aa68c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,14 @@ void markSessionDisconnect() {
try {
while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false);
tryNotifyFailureWithSeqID(e.sequenceId, error_types.ERR_SESSION_RESET, false);
}
List<RequestEntry> l = new LinkedList<RequestEntry>();
for (Map.Entry<Integer, RequestEntry> entry : pendingResponse.entrySet()) {
l.add(entry.getValue());
}
for (RequestEntry e : l) {
tryNotifyWithSequenceID(e.sequenceId, error_types.ERR_SESSION_RESET, false);
tryNotifyFailureWithSeqID(e.sequenceId, error_types.ERR_SESSION_RESET, false);
}
} catch (Exception e) {
logger.error(
Expand All @@ -267,7 +267,7 @@ void markSessionDisconnect() {
}

// Notify the RPC sender if failure occurred.
void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask)
void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTask)
throws Exception {
logger.debug(
"{}: {} is notified with error {}, isTimeoutTask {}",
Expand Down Expand Up @@ -327,7 +327,7 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
name(),
entry.sequenceId,
channelFuture.cause());
tryNotifyWithSequenceID(entry.sequenceId, error_types.ERR_TIMEOUT, false);
tryNotifyFailureWithSeqID(entry.sequenceId, error_types.ERR_TIMEOUT, false);
}
}
});
Expand All @@ -342,7 +342,7 @@ private ScheduledFuture<?> addTimer(final int seqID, long timeoutInMillseconds)
@Override
public void run() {
try {
tryNotifyWithSequenceID(seqID, error_types.ERR_TIMEOUT, true);
tryNotifyFailureWithSeqID(seqID, error_types.ERR_TIMEOUT, true);
} catch (Exception e) {
logger.warn("try notify with sequenceID {} exception!", seqID, e);
}
Expand All @@ -368,6 +368,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
@Override
public void channelRead0(ChannelHandlerContext ctx, final RequestEntry msg) {
logger.debug("{}: handle response with seqid({})", name(), msg.sequenceId);
firstRecentTimedOutMs.set(0); // This session is currently healthy.
if (msg.callback != null) {
msg.callback.run();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void testTryNotifyWithSequenceID() throws Exception {
// no pending RequestEntry, ensure no NPE thrown
Assert.assertTrue(rs.pendingResponse.isEmpty());
try {
rs.tryNotifyWithSequenceID(100, error_code.error_types.ERR_TIMEOUT, false);
rs.tryNotifyFailureWithSeqID(100, error_code.error_types.ERR_TIMEOUT, false);
} catch (Exception e) {
Assert.assertNull(e);
}
Expand All @@ -234,20 +234,20 @@ public void testTryNotifyWithSequenceID() throws Exception {
entry.timeoutTask = null; // simulate the timeoutTask has been null
entry.op = new rrdb_put_operator(new gpid(1, 1), null, null, 0);
rs.pendingResponse.put(100, entry);
rs.tryNotifyWithSequenceID(100, error_code.error_types.ERR_TIMEOUT, false);
rs.tryNotifyFailureWithSeqID(100, error_code.error_types.ERR_TIMEOUT, false);
Assert.assertTrue(passed.get());

// simulate the entry has been removed, ensure no NPE thrown
rs.getAndRemoveEntry(entry.sequenceId);
rs.tryNotifyWithSequenceID(entry.sequenceId, entry.op.rpc_error.errno, true);
rs.tryNotifyFailureWithSeqID(entry.sequenceId, entry.op.rpc_error.errno, true);

// ensure mark session state to disconnect when TryNotifyWithSequenceID incur any exception
ReplicaSession mockRs = Mockito.spy(rs);
mockRs.pendingSend.offer(entry);
mockRs.fields.state = ConnState.CONNECTED;
Mockito.doThrow(new Exception())
.when(mockRs)
.tryNotifyWithSequenceID(entry.sequenceId, entry.op.rpc_error.errno, false);
.tryNotifyFailureWithSeqID(entry.sequenceId, entry.op.rpc_error.errno, false);
mockRs.markSessionDisconnect();
Assert.assertEquals(mockRs.getState(), ConnState.DISCONNECTED);
}
Expand Down

0 comments on commit a0aa68c

Please sign in to comment.