Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: using "interceptor" to enhance the api(auto-retry) #127

Draft
wants to merge 49 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
d6b0d19
init
foreverneverer Aug 25, 2020
9dae4ce
init the api
foreverneverer Aug 25, 2020
a3637aa
add compress interceptor
foreverneverer Aug 26, 2020
cd24eed
fix open
foreverneverer Aug 27, 2020
56e5e27
add auto-retry interceptor
foreverneverer Aug 27, 2020
482e335
move and after
foreverneverer Aug 31, 2020
2edb6a0
move and after
foreverneverer Aug 31, 2020
e7ecefb
move and after
foreverneverer Aug 31, 2020
49ac5fd
move and after
foreverneverer Aug 31, 2020
ef7aa21
merge lastest code
foreverneverer Aug 31, 2020
44eb64a
fix
foreverneverer Sep 1, 2020
e7c98c8
fix
foreverneverer Sep 1, 2020
151dd6a
fix
foreverneverer Sep 1, 2020
e3626d6
merge
foreverneverer Sep 2, 2020
181ed6f
merge
foreverneverer Sep 2, 2020
d8f1daa
merge
foreverneverer Sep 2, 2020
e4d5ba5
merge
foreverneverer Sep 2, 2020
660ac79
merge master
foreverneverer Sep 2, 2020
30e1567
fix
foreverneverer Sep 2, 2020
501a155
fix
foreverneverer Sep 2, 2020
5b4dcc0
fix
foreverneverer Sep 2, 2020
08fd240
fix
foreverneverer Sep 2, 2020
834888e
fix
foreverneverer Sep 2, 2020
3ced11e
add test
foreverneverer Sep 2, 2020
148b584
add test
foreverneverer Sep 2, 2020
4c0998e
add interface
foreverneverer Sep 2, 2020
9850a37
add interface
foreverneverer Sep 2, 2020
3aea6ca
add interface
foreverneverer Sep 2, 2020
ad8140b
add interface
foreverneverer Sep 2, 2020
779a80f
add interface
foreverneverer Sep 2, 2020
86106c0
merge
foreverneverer Sep 2, 2020
fa91583
merge
foreverneverer Sep 2, 2020
024a164
merge
foreverneverer Sep 2, 2020
f6e6ad5
fix options
foreverneverer Sep 3, 2020
c990362
fix options
foreverneverer Sep 3, 2020
602c47b
fix options
foreverneverer Sep 3, 2020
b3fdc68
fix
foreverneverer Sep 3, 2020
5213342
fix
foreverneverer Sep 3, 2020
bf4266b
add comment
foreverneverer Sep 3, 2020
cde4fbe
add comment
foreverneverer Sep 3, 2020
9cb7312
add comment
foreverneverer Sep 3, 2020
b31c7a0
add comment
foreverneverer Sep 3, 2020
6bea336
test
foreverneverer Sep 4, 2020
74b7a16
fix comment
foreverneverer Sep 4, 2020
d8872d4
fix comment
foreverneverer Sep 4, 2020
d076eba
fix comment
foreverneverer Sep 4, 2020
44206fd
fix comment
foreverneverer Sep 4, 2020
d64d84c
merge
foreverneverer Sep 4, 2020
b1637b9
merge
foreverneverer Sep 4, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
<version>3.9</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
Expand Down
86 changes: 85 additions & 1 deletion src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,81 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

/** TableOptions is the internal options for opening a Pegasus table. */
import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound;

/** TableOptions is for opening a Pegasus table with some feature */
public class TableOptions {

/**
* this class control the retry options based `Exponential Backoff` after rpc call failed, used by
* {@link com.xiaomi.infra.pegasus.rpc.interceptor.AutoRetryInterceptor}
*/
public static class RetryOptions {
private long tryTimeoutMs;
private long delayTimeMs;
private int MaxTryCount;

public RetryOptions(long tryTimeoutMs, long delayTimeMs) {
assert (tryTimeoutMs > 0 && delayTimeMs >= 0);
this.tryTimeoutMs = tryTimeoutMs;
this.delayTimeMs = delayTimeMs;
this.MaxTryCount = 3;
}

/**
* set the try timeout
*
* @param tryTimeoutMs try timeout, it should be greater than zero. if set RetryOptions for
* {@link TableOptions}, which means the rpc timeout is updated to tryTimeout, detail see
* {@link
* com.xiaomi.infra.pegasus.rpc.interceptor.AutoRetryInterceptor#updateRequestTimeout(ClientRequestRound)}
* @return this
*/
public RetryOptions withTryTimeoutMs(long tryTimeoutMs) {
this.tryTimeoutMs = tryTimeoutMs;
return this;
}

/**
* set base delay time before the new rpc call, the actual delay time base `Exponential
* Backoff`, detail see {@link
* com.xiaomi.infra.pegasus.rpc.interceptor.AutoRetryInterceptor#getExpDelayTimeMs(int, long)}
*
* @param delayTimeMs base delay time before new rpc call
* @return this
*/
public RetryOptions withDelayTimeMs(long delayTimeMs) {
this.delayTimeMs = delayTimeMs;
return this;
}

/**
* set max try call count
*
* @param maxTryCount max try call count, default is 3
* @return this
*/
public RetryOptions withMaxTryCount(int maxTryCount) {
this.MaxTryCount = maxTryCount;
return this;
}

public long tryTimeoutMs() {
return tryTimeoutMs;
}

public long delayTimeMs() {
return delayTimeMs;
}

public int maxTryCount() {
return MaxTryCount;
}
}

private int backupRequestDelayMs;
private boolean enableCompression;
private RetryOptions retryOptions;

public TableOptions() {
this.backupRequestDelayMs = 0;
Expand All @@ -23,6 +94,11 @@ public TableOptions withCompression(boolean enableCompression) {
return this;
}

public TableOptions withRetry(RetryOptions retryOptions) {
this.retryOptions = retryOptions;
return this;
}

public int backupRequestDelayMs() {
return this.backupRequestDelayMs;
}
Expand All @@ -34,4 +110,12 @@ public boolean enableBackupRequest() {
public boolean enableCompression() {
return enableCompression;
}

public boolean enableAutoRetry() {
return retryOptions != null;
}

public RetryOptions retryOptions() {
return retryOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ public final class ClientRequestRound {

client_operator operator;
Table.ClientOPCallback callback;
long timeoutMs;
public long timeoutMs;
public long nextDelayTime;
public long remainingTime;
public int tryId;

boolean enableCounter;
long createNanoTime;
long expireNanoTime;
boolean isCompleted;
int tryId;
ScheduledFuture<?> backupRequestTask;

/**
Expand All @@ -38,6 +40,7 @@ public ClientRequestRound(
this.operator = op;
this.callback = cb;
this.timeoutMs = timeoutInMilliseconds;
this.remainingTime = timeoutMs;

this.enableCounter = enableCounter;
this.createNanoTime = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,12 @@ public void run() {
}
}

void call(final ClientRequestRound round) {
public boolean forTest(ClientRequestRound round) {
call(round);
return true;
}

public void call(final ClientRequestRound round) {
// tableConfig & handle is initialized in constructor, so both shouldn't be null
final TableConfiguration tableConfig = tableConfig_.get();
final ReplicaConfiguration handle =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.xiaomi.infra.pegasus.base.error_code.error_types;
import com.xiaomi.infra.pegasus.client.TableOptions.RetryOptions;
import com.xiaomi.infra.pegasus.operator.client_operator;
import com.xiaomi.infra.pegasus.rpc.Table.ClientOPCallback;
import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;

/**
* the interceptor support the rpc retry based `Exponential Backoff`, which will guarantee return in
* request timeout user passed.
*
* <p>every rpc call has timeout{@link ClientRequestRound#timeoutMs}, if not set {@link
* RetryOptions} when open table using {@link TableOptions}, the rpc timeout is equal with request
* timeout, see the {@link TableHandler#asyncOperate(client_operator, ClientOPCallback, int)},
* otherwise, the timeout is updated by the class and is equal with {@link
* RetryOptions#tryTimeoutMs}, or is equal with {@link ClientRequestRound#remainingTime} at last
* time, detail see {@link AutoRetryInterceptor#updateRequestTimeout(ClientRequestRound)}
*/
public class AutoRetryInterceptor implements TableInterceptor {
private static final Logger logger =
org.slf4j.LoggerFactory.getLogger(AutoRetryInterceptor.class);
private long tryTimeoutMs;
private long delayTimeMs;
private int maxRetryCount;

public AutoRetryInterceptor(RetryOptions retryOptions) {
this.tryTimeoutMs = retryOptions.tryTimeoutMs();
this.delayTimeMs = retryOptions.delayTimeMs();
this.maxRetryCount = retryOptions.maxTryCount();
}

@Override
public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
updateRequestTimeout(clientRequestRound);
}

/**
* This method used for updating request timeout value before call
*
* <p>the rpc timeout is equal with tryTimeout in general, but if the remainingTime <=
* nextCallTime or clientRequestRound.tryId = maxTryCount, means the remainingTime only can be
* enough to call one time, so set the rpc timeout = remainingTime, it's especially note that if
* the case happen at first call, the rpc will be only called once and not trigger retry logic.
*
* <p>case 1: the last remaining time is just enough for once, the last rpc timeout=tryTimeout
* |--------------------------request timeout------------------------------------------|
* |---tryTimeout(1)---|-delayTime-|---tryTimeout(2)---|-delayTime-|---tryTimeout(3)---|
*
* <p>case 2: the last remaining time is not enough for once, the last rpc timeout=remainingTime
* |-------------------------request timeout----------------------------------------------|
* |----tryTimeout(1)----|-delayTime-|----tryTimeout(2)----|-delayTime-|-remainingTime(3)-|
*
* <p>case 3: the try count is equal with maxTryTime=2, the last rpc timeout=remainingTime
* |-----------------------request timeout------------------------------------------------|
* |----tryTimeout(1)----|-delayTime-|------------------remainingTime(2)------------------|
*
* <p>case 4: the tryTimeout + delayTime > request timeout(also remainingTime) at first call, the
* rpc timeout=request timeout and rpc won't be retried
*
* <p>|-----------------------request timeout----------------------------------|
* </>|---------------------tryTimeout(1)----------------|---------delayTime-----------|
*
* @param clientRequestRound request body
*/
private void updateRequestTimeout(ClientRequestRound clientRequestRound) {
clientRequestRound.nextDelayTime =
getExpDelayTimeMs(clientRequestRound.tryId, clientRequestRound.remainingTime);
long nextCallTime = tryTimeoutMs + clientRequestRound.nextDelayTime;

clientRequestRound.timeoutMs =
(clientRequestRound.remainingTime <= nextCallTime)
|| (clientRequestRound.tryId == maxRetryCount)
? clientRequestRound.remainingTime
: tryTimeoutMs;

clientRequestRound.remainingTime -=
(clientRequestRound.timeoutMs + clientRequestRound.nextDelayTime);
}

@Override
public void after(
ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) {
retryCall(clientRequestRound, errno, tableHandler);
}

/**
* sleep the delayTime and retry call
*
* @param clientRequestRound clientRequestRound
* @param errno rpc call response error code
* @param tableHandler tableHandler
*/
private void retryCall(
ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) {
if (errno != error_types.ERR_TIMEOUT) {
return;
}

if (++clientRequestRound.tryId > maxRetryCount || clientRequestRound.remainingTime <= 0) {
return;
}

try {
Thread.sleep(clientRequestRound.nextDelayTime);
} catch (InterruptedException e) {
logger.warn(
"sleep {}ms is interrupted when ready for retrying call, which will start next call rpc immediately",
clientRequestRound.nextDelayTime);
}

tableHandler.call(clientRequestRound);
}

/**
* compute the next delay time based `Exponential Backoff` before new rpc call
*
* @param tryCount the count of rpc call
* @param remainingTime the remaining time of request time
* @return the delay time base `Exponential Backoff` before new rpc call, if the remainingTime <
* delayTimeMs * 2 ^ (maxTryCount - 1), return remainingTime
*/
private long getExpDelayTimeMs(int tryCount, long remainingTime) {
return RandomUtils.nextLong(0, Math.min(remainingTime, delayTimeMs * 2 ^ (tryCount - 1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import java.util.concurrent.TimeUnit;

public class BackupRequestInterceptor implements TableInterceptor {
private final long backupRequestDelayMs;

public BackupRequestInterceptor(long backupRequestDelayMs) {
private final int backupRequestDelayMs;

public BackupRequestInterceptor(int backupRequestDelayMs) {
this.backupRequestDelayMs = backupRequestDelayMs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,20 @@ public class InterceptorManger {

private List<TableInterceptor> interceptors = new ArrayList<>();

/**
* The interceptor manager
*
* <p>Note: {@link AutoRetryInterceptor} must be added and executed before {@link
* BackupRequestInterceptor}, for the {@link AutoRetryInterceptor} will modify the {@link
* ClientRequestRound#timeoutMs} which is used by {@link BackupRequestInterceptor}
*
* @param options control the interceptor switch, detail see {@link TableOptions}
*/
public InterceptorManger(TableOptions options) {
if (options.enableAutoRetry()) {
interceptors.add(new AutoRetryInterceptor(options.retryOptions()));
}

if (options.enableBackupRequest()) {
interceptors.add(new BackupRequestInterceptor(options.backupRequestDelayMs()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package com.xiaomi.infra.pegasus.rpc.async;

import com.xiaomi.infra.pegasus.base.error_code.error_types;
import com.xiaomi.infra.pegasus.client.ClientOptions;
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.client.PegasusClientFactory;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import com.xiaomi.infra.pegasus.client.TableOptions;
import com.xiaomi.infra.pegasus.client.TableOptions.RetryOptions;
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.KeyHasher;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class InterceptorTest {
@Test
Expand Down Expand Up @@ -36,4 +44,35 @@ public void testCompressionInterceptor() throws PException {
Assertions.assertEquals(
new String(compressTable.get(hashKey, sortKey, 10000)), new String(compressionValue));
}

@Test
// TODO(jiashuo1) add test for retry
public void testAutoRetryInterceptor() throws ReplicationException {
TableHandler table = createRetryTable(300, 100);
table.forTest(new ClientRequestRound(null, null, false, System.nanoTime() + 1000000, 1000));

ClientRequestRound clientRequestRound =
new ClientRequestRound(null, null, false, System.nanoTime() + 1000000, 1000);
Mockito.when(table.forTest(clientRequestRound))
.then(
new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
clientRequestRound.getOperator().rpc_error.errno = error_types.ERR_TIMEOUT;
table.onRpcReply(clientRequestRound, 1, null);
return null;
}
});
}

private TableHandler createRetryTable(long retryTime, long delayTime)
throws ReplicationException {
return Mockito.spy(
new TableHandler(
new ClusterManager(ClientOptions.create()),
"temp",
new InternalTableOptions(
KeyHasher.DEFAULT,
new TableOptions().withRetry(new RetryOptions(retryTime, delayTime)))));
}
}