Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
fix: set error to ERR_SESSION_RESET to trigger meta query while se… (#54
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Wu Tao authored and neverchanje committed Sep 24, 2019
1 parent 91f672a commit b5d2b48
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ log.txt
rolling_log/
.vscode/
google-java-format-*
pegasus-*
14 changes: 10 additions & 4 deletions scripts/travis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,23 @@ if [[ $(git status -s) ]]; then
exit 1
fi

PEGASUS_PKG="pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release"
PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus/releases/download/v1.11.6/pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release.tar.gz"

# start pegasus onebox environment
wget https://github.com/XiaoMi/pegasus/releases/download/v1.11.3/pegasus-1.11.3-b45cb06-linux-x86_64-release.zip
unzip pegasus-1.11.3-b45cb06-linux-x86_64-release.zip
cd pegasus-1.11.3-b45cb06-linux-x86_64-release
if [ ! -f $PEGASUS_PKG.tar.gz ]; then
wget $PEGASUS_PKG_URL
tar xvf $PEGASUS_PKG.tar.gz
fi
cd $PEGASUS_PKG

sed -i "s#https://github.com/xiaomi/pegasus-common/raw/master/zookeeper-3.4.6.tar.gz#https://github.com/XiaoMi/pegasus-common/releases/download/deps/zookeeper-3.4.6.tar.gz#" scripts/start_zk.sh
./run.sh start_onebox -w
cd ../

if ! mvn clean test
then
cd pegasus-1.11.3-b45cb06-linux-x86_64-release
cd $PEGASUS_PKG
./run.sh list_onebox
exit 1
fi
6 changes: 4 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/client/PException.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package com.xiaomi.infra.pegasus.client;

/**
* @author qinzuoyan
* <p>Pegasus exception.
* The generic type of exception thrown by all of the Pegasus APIs.
*
* <p>Common strategies of handling PException include retrying, or ignoring. We recommend you to
* log the exception for future debugging.
*/
public class PException extends Exception {
private static final long serialVersionUID = 4436491238550521203L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public ReplicationException(error_code.error_types t) {
}

public ReplicationException(error_code.error_types t, String message) {
super(t.name() + ": " + message);
super(t.name() + (message.isEmpty() ? "" : (": " + message)));
err_type = t;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;

/** Created by weijiesun on 17-9-13. */
public class ReplicaSession {
public static class RequestEntry {
public int sequenceId;
Expand Down Expand Up @@ -87,7 +86,7 @@ public int asyncSend(client_operator op, Runnable callbackFunc, long timeoutInMi
entry.callback = callbackFunc;
// NOTICE: must make sure the msg is put into the pendingResponse map BEFORE
// the timer task is scheduled.
pendingResponse.put(new Integer(entry.sequenceId), entry);
pendingResponse.put(entry.sequenceId, entry);
entry.timeoutTask = addTimer(entry.sequenceId, timeoutInMilliseconds);
entry.timeoutMs = timeoutInMilliseconds;

Expand Down Expand Up @@ -136,7 +135,7 @@ public void closeSession() {
}

public RequestEntry getAndRemoveEntry(int seqID) {
return pendingResponse.remove(new Integer(seqID));
return pendingResponse.remove(seqID);
}

public final String name() {
Expand Down Expand Up @@ -179,7 +178,7 @@ private void markSessionConnected(Channel activeChannel) {
synchronized (pendingSend) {
while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
if (pendingResponse.get(new Integer(e.sequenceId)) != null) {
if (pendingResponse.get(e.sequenceId) != null) {
write(e, newCache);
} else {
logger.info("{}: {} is removed from pending, perhaps timeout", name(), e.sequenceId);
Expand Down Expand Up @@ -221,19 +220,17 @@ private void markSessionDisconnect() {
}
}

// Notify the RPC sender if failure occurred.
private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask) {
logger.debug(
"{}: {} is notified with error {}, isTimeoutTask {}",
name(),
seqID,
errno.toString(),
isTimeoutTask);
RequestEntry entry = pendingResponse.remove(new Integer(seqID));
RequestEntry entry = pendingResponse.remove(seqID);
if (entry != null) {
if (!isTimeoutTask) entry.timeoutTask.cancel(true);
entry.op.rpc_error.errno = errno;
entry.callback.run();

if (errno == error_types.ERR_TIMEOUT) {
long firstTs = firstRecentTimedOutMs.get();
if (firstTs == 0) {
Expand All @@ -246,12 +243,15 @@ private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTim
"{}: actively close the session because it's not responding for {} seconds",
name(),
sessionResetTimeWindowMs / 1000);
closeSession();
closeSession(); // maybe fail when the session is already disconnected.
errno = error_types.ERR_SESSION_RESET;
}
}
} else {
firstRecentTimedOutMs.set(0);
}
entry.op.rpc_error.errno = errno;
entry.callback.run();
} else {
logger.warn(
"{}: {} is removed by others, current error {}, isTimeoutTask {}",
Expand Down Expand Up @@ -284,6 +284,9 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
});
}

// Notify the RPC caller when times out. If the RPC finishes in time,
// this task will be cancelled.
// TODO(wutao1): call it addTimeoutTicker
private ScheduledFuture addTimer(final int seqID, long timeoutInMillseconds) {
return rpcGroup.schedule(
new Runnable() {
Expand Down Expand Up @@ -337,7 +340,7 @@ ConnState getState() {
}

interface MessageResponseFilter {
public boolean abandonIt(error_types err, TMessage header);
boolean abandonIt(error_types err, TMessage header);
}

MessageResponseFilter filter = null;
Expand Down

0 comments on commit b5d2b48

Please sign in to comment.