diff --git a/.gitignore b/.gitignore index 6c7cde0f..a70baad8 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ log.txt rolling_log/ .vscode/ google-java-format-* +pegasus-* diff --git a/scripts/travis.sh b/scripts/travis.sh index 1c3798b3..0f31d0b4 100755 --- a/scripts/travis.sh +++ b/scripts/travis.sh @@ -26,10 +26,15 @@ if [[ $(git status -s) ]]; then exit 1 fi +PEGASUS_PKG="pegasus-1.11.3-b45cb06-linux-x86_64-release" +PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus/releases/download/v1.11.3/pegasus-1.11.3-b45cb06-linux-x86_64-release.zip" + # start pegasus onebox environment -wget https://github.com/XiaoMi/pegasus/releases/download/v1.11.3/pegasus-1.11.3-b45cb06-linux-x86_64-release.zip -unzip pegasus-1.11.3-b45cb06-linux-x86_64-release.zip -cd pegasus-1.11.3-b45cb06-linux-x86_64-release +if [ ! -f $PEGASUS_PKG.zip ]; then + wget $PEGASUS_PKG_URL + unzip $PEGASUS_PKG.zip +fi +cd $PEGASUS_PKG ./run.sh start_onebox -w cd ../ diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PException.java b/src/main/java/com/xiaomi/infra/pegasus/client/PException.java index b675dab8..feec9a93 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PException.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PException.java @@ -4,8 +4,10 @@ package com.xiaomi.infra.pegasus.client; /** - * @author qinzuoyan - *
Pegasus exception. + * The generic type of exception thrown by all of the Pegasus APIs. + * + *
Common strategies of handling PException include retrying, or ignoring. We recommend you to + * log the exception for future debugging. */ public class PException extends Exception { private static final long serialVersionUID = 4436491238550521203L; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/ReplicationException.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/ReplicationException.java index fc5f8ce0..b2000816 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/ReplicationException.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/ReplicationException.java @@ -20,7 +20,7 @@ public ReplicationException(error_code.error_types t) { } public ReplicationException(error_code.error_types t, String message) { - super(t.name() + ": " + message); + super(t.name() + (message.isEmpty() ? "" : (": " + message))); err_type = t; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 6ce9757a..8e037c9f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; -/** Created by weijiesun on 17-9-13. */ public class ReplicaSession { public static class RequestEntry { public int sequenceId; @@ -87,7 +86,7 @@ public int asyncSend(client_operator op, Runnable callbackFunc, long timeoutInMi entry.callback = callbackFunc; // NOTICE: must make sure the msg is put into the pendingResponse map BEFORE // the timer task is scheduled. - pendingResponse.put(new Integer(entry.sequenceId), entry); + pendingResponse.put(entry.sequenceId, entry); entry.timeoutTask = addTimer(entry.sequenceId, timeoutInMilliseconds); entry.timeoutMs = timeoutInMilliseconds; @@ -136,7 +135,7 @@ public void closeSession() { } public RequestEntry getAndRemoveEntry(int seqID) { - return pendingResponse.remove(new Integer(seqID)); + return pendingResponse.remove(seqID); } public final String name() { @@ -179,7 +178,7 @@ private void markSessionConnected(Channel activeChannel) { synchronized (pendingSend) { while (!pendingSend.isEmpty()) { RequestEntry e = pendingSend.poll(); - if (pendingResponse.get(new Integer(e.sequenceId)) != null) { + if (pendingResponse.get(e.sequenceId) != null) { write(e, newCache); } else { logger.info("{}: {} is removed from pending, perhaps timeout", name(), e.sequenceId); @@ -221,6 +220,7 @@ private void markSessionDisconnect() { } } + // Notify the RPC sender if failure occurred. private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask) { logger.debug( "{}: {} is notified with error {}, isTimeoutTask {}", @@ -228,12 +228,9 @@ private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTim seqID, errno.toString(), isTimeoutTask); - RequestEntry entry = pendingResponse.remove(new Integer(seqID)); + RequestEntry entry = pendingResponse.remove(seqID); if (entry != null) { if (!isTimeoutTask) entry.timeoutTask.cancel(true); - entry.op.rpc_error.errno = errno; - entry.callback.run(); - if (errno == error_types.ERR_TIMEOUT) { long firstTs = firstRecentTimedOutMs.get(); if (firstTs == 0) { @@ -246,9 +243,13 @@ private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTim "{}: actively close the session because it's not responding for {} seconds", name(), sessionResetTimeWindowMs / 1000); - closeSession(); + closeSession(); // maybe fail when the session is already disconnected. + errno = error_types.ERR_SESSION_RESET; } } + + entry.op.rpc_error.errno = errno; + entry.callback.run(); } else { firstRecentTimedOutMs.set(0); } @@ -284,6 +285,9 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { }); } + // Notify the RPC caller when times out. If the RPC finishes in time, + // this task will be cancelled. + // TODO(wutao1): call it addTimeoutTicker private ScheduledFuture addTimer(final int seqID, long timeoutInMillseconds) { return rpcGroup.schedule( new Runnable() { @@ -337,7 +341,7 @@ ConnState getState() { } interface MessageResponseFilter { - public boolean abandonIt(error_types err, TMessage header); + boolean abandonIt(error_types err, TMessage header); } MessageResponseFilter filter = null;