Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add exponential retry if TxnConflictException occures #217

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/io/dgraph/AsyncTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public CompletableFuture<Response> doRequest(Request request, long duration, Tim
mergeContext(response.getTxn());
return response;
});
})
},false)
.handle(
(Response response, Throwable throwable) -> {
if (throwable != null) {
Expand All @@ -327,7 +327,7 @@ public CompletableFuture<Response> doRequest(Request request, long duration, Tim
*
* @return CompletableFuture with Void result
*/
public CompletableFuture<Void> commit() {
public CompletableFuture<Void> commit( boolean retry) {
if (readOnly) {
throw new TxnReadOnlyException();
}
Expand All @@ -348,7 +348,7 @@ public CompletableFuture<Void> commit() {
DgraphStub localStub = client.getStubWithJwt(stub);
localStub.commitOrAbort(context, bridge);
return bridge.getDelegate().thenApply(txnContext -> null);
});
}, retry);
}

/**
Expand Down Expand Up @@ -379,7 +379,7 @@ public CompletableFuture<Void> discard() {
DgraphStub localStub = client.getStubWithJwt(stub);
localStub.commitOrAbort(context, bridge);
return bridge.getDelegate().thenApply((o) -> null);
});
},false);
}

private void mergeContext(final TxnContext src) {
Expand Down
139 changes: 118 additions & 21 deletions src/main/java/io/dgraph/DgraphAsyncClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ protected DgraphGrpc.DgraphStub getStubWithJwt(DgraphGrpc.DgraphStub stub) {
* @return a completable future which can be used to get the result
*/
protected <T> CompletableFuture<T> runWithRetries(
String operation, Callable<CompletableFuture<T>> callable) {
String operation, Callable<CompletableFuture<T>> callable,boolean retry) {
final Callable<CompletableFuture<T>> ctxCallable = Context.current().wrap(callable);

return CompletableFuture.supplyAsync(
Expand All @@ -221,29 +221,18 @@ protected <T> CompletableFuture<T> runWithRetries(
throw new RuntimeException(e);
} catch (ExecutionException e) {
if (ExceptionUtil.isJwtExpired(e.getCause())) {
try {
// retry the login
retryLogin().get();
// retry the supplied logic
return ctxCallable.call().get();
} catch (InterruptedException ie) {
LOG.error("The retried " + operation + " got interrupted:", ie);
throw new RuntimeException(ie);
} catch (ExecutionException ie) {
LOG.error("The retried " + operation + " encounters an execution exception:", ie);
throw new RuntimeException(ie);
} catch (Exception ie) {
LOG.error("The retried " + operation + " encounters a completion exception:", ie);
throw new CompletionException(ie);
}
return retryLoginAndExecute(ctxCallable, operation);
} else if (e.getCause() instanceof StatusRuntimeException) {
StatusRuntimeException ex1 = (StatusRuntimeException) e.getCause();
Status.Code code = ex1.getStatus().getCode();
String desc = ex1.getStatus().getDescription();

String desc= ex1.getStatus().getDescription();
if (code.equals(Status.Code.ABORTED)
|| code.equals(Status.Code.FAILED_PRECONDITION)) {
throw new CompletionException(new TxnConflictException(desc));
if (retry){
return executeWithExponentialBackoff(
ctxCallable, 5, 500, operation,desc);
}
throw new CompletionException(new TxnConflictException(desc));
}
}
// Handle the case when the outer exception is not caused by JWT expiration
Expand All @@ -256,6 +245,114 @@ protected <T> CompletableFuture<T> runWithRetries(
this.executor);
}

/**
* Retries login and re-executes the given Callable.
*
* @param <T> The type of the Callable's returned CompletableFuture.
* @param ctxCallable The Callable to execute.
* @param operation The name of the operation for logging purposes.
* @return The result of the Callable if successful.
* @throws RuntimeException if login retry or callable execution fails.
*/
private <T> T retryLoginAndExecute(Callable<CompletableFuture<T>> ctxCallable, String operation) {
try {
// Retry the login
retryLogin().get();
// Retry the supplied logic
return ctxCallable.call().get();
} catch (InterruptedException ie) {
LOG.error("The retried " + operation + " got interrupted:", ie);
throw new RuntimeException(ie);
} catch (ExecutionException ee) {
LOG.error("The retried " + operation + " encounters an execution exception:", ee);
throw new RuntimeException(ee);
} catch (Exception e) {
LOG.error("The retried " + operation + " encounters a completion exception:", e);
throw new CompletionException(e);
}
}

/**
* Implements exponential backoff with retries for a given Callable.
*
* @param <T> The type of the Callable's returned CompletableFuture.
* @param ctxCallable The Callable to execute.
* @param maxRetries The maximum number of retries.
* @param initialDelay The initial delay in milliseconds.
* @param operation The name of the operation for logging purposes.
* @return The result of the Callable if successful.
* @throws RuntimeException if the operation fails after the maximum retries.
*/
private <T> T executeWithExponentialBackoff(
Callable<CompletableFuture<T>> ctxCallable,
int maxRetries,
int delay,
String operation,
String desc) {
int attempt = 0;
while (attempt < maxRetries) {
try {
return ctxCallable.call().get();
} catch (InterruptedException ie) {
LOG.error("The retried " + operation + " got interrupted:", ie);
throw new RuntimeException(ie);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (ExceptionUtil.isJwtExpired(cause)) {
return retryLoginAndExecute(ctxCallable, operation);
} else if (cause instanceof StatusRuntimeException) {
StatusRuntimeException ex = (StatusRuntimeException) cause;
Status.Code code = ex.getStatus().getCode();
desc = ex.getStatus().getDescription();

if (code.equals(Status.Code.ABORTED) || code.equals(Status.Code.FAILED_PRECONDITION)) {
if (attempt < maxRetries - 1) {
LOG.warn(
"Operation "
+ operation
+ " failed with "
+ code
+ ". Retrying in "
+ delay
+ "ms attempt "
+ attempt
+ 1
+ "/"
+ maxRetries);
try {
Thread.sleep(delay);
} catch (InterruptedException ie2) {
LOG.error("The backoff sleep got interrupted: " + ie2.getMessage());
throw new RuntimeException(ie2);
}
delay *= 2; // Exponential backoff
attempt++;
} else {
LOG.error(
"Operation "
+ operation
+ " failed after attempts: "
+ maxRetries
+ "description :"
+ desc);
throw new CompletionException(new TxnConflictException(desc));
}
} else {
LOG.error("Operation " + operation + "failed with unexpected error:" + ex.toString());
throw new RuntimeException(ex);
}
} else {
LOG.error("Operation " + operation + " encountered an execution exception: " + ee);
throw new RuntimeException(ee);
}
} catch (Exception e) {
LOG.error("Operation " + operation + " encountered an execution exception: " + e);
throw new RuntimeException(e);
}
}
throw new CompletionException(new TxnConflictException("Max retries reached " + desc));
}

/**
* Alter can be used to perform the following operations, by setting the right fields in the
* protocol buffer Operation object.
Expand All @@ -279,7 +376,7 @@ public CompletableFuture<Payload> alter(DgraphProto.Operation op) {
DgraphGrpc.DgraphStub localStub = getStubWithJwt(stub);
localStub.alter(op, observerBridge);
return observerBridge.getDelegate();
});
},false);
}

/**
Expand All @@ -300,7 +397,7 @@ public CompletableFuture<Version> checkVersion() {
DgraphGrpc.DgraphStub localStub = getStubWithJwt(stub);
localStub.checkVersion(checkRequest, observerBridge);
return observerBridge.getDelegate();
});
},false);
}

private DgraphGrpc.DgraphStub anyClient() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/dgraph/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ public Response doRequest(Request request, long duration, TimeUnit units) {
* thrown if transactions that modify the same data are being run concurrently. It's up to the
* user to decide if they wish to retry. In this case, the user should create a new transaction.
*/
public void commit() {
public void commit(boolean retry) {
ExceptionUtil.withExceptionUnwrapped(
() -> {
asyncTransaction.commit().join();
asyncTransaction.commit(retry).join();
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/dgraph/AcctUpsertTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void tryUpsert(Account account) {
String nq = String.format("<%s> <when> \"%d\"^^<xs:int> .", uid, System.nanoTime());
Mutation mu = Mutation.newBuilder().setSetNquads(ByteString.copyFromUtf8(nq)).build();
txn.mutate(mu);
txn.commit();
txn.commit(false);
} finally {
txn.discard();
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/dgraph/AclTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testAuthorization() throws Exception {

// create the dev group and add the user to it
createGroupAndACLs(DEV_GROUP, true);
System.out.println("Sleep for 6 seconds for acl caches to be refreshed");
System.out.println("Sleep for 4 seconds for acl caches to be refreshed");
Thread.sleep(6 * 1000);

// now the operations should succeed again through the dev group
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/dgraph/BankTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private void createAccounts() {
Mutation mu =
Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8(gson.toJson(accounts))).build();
Response response = txn.mutate(mu);
txn.commit();
txn.commit(false);
response.getUidsMap().forEach((key, uid) -> uids.add(uid));
}

Expand Down Expand Up @@ -109,7 +109,7 @@ private void runTxn() {
Mutation mu =
Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8(gson.toJson(accounts))).build();
txn.mutate(mu);
txn.commit();
txn.commit(false);
} finally {
txn.discard();
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/dgraph/DgraphAsyncClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testDelete() throws Exception {
assertEquals(jsonData.getAsJsonArray("find_bob").size(), 0);
}))
.get();
txn.commit();
txn.commit(false);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/dgraph/DgraphClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testDelete() {
response = txn.query(query);
jsonData = parser.parse(response.getJson().toStringUtf8()).getAsJsonObject();
assertEquals(jsonData.getAsJsonArray("find_bob").size(), 0);
txn.commit();
txn.commit(false);
}
}

Expand Down Expand Up @@ -158,7 +158,7 @@ public void testCommitAfterCommitNow() {
.setCommitNow(true)
.build();
txn.mutate(mu);
txn.commit();
txn.commit(false);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/test/java/io/dgraph/ExceptionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ public void testConflictException() {
Transaction txn2 = dgraphClient.newTransaction();
txn1.mutate(mu);
txn2.mutate(mu);
txn1.commit();
txn2.commit();
txn1.commit(false);
txn2.commit(false);
}

@Test(expectedExceptions = TxnFinishedException.class)
public void testFinishedException() {
Transaction txn = dgraphClient.newTransaction();
txn.mutate(mu);
txn.commit();
txn.commit();
txn.commit(false);
txn.commit(false);
}

@Test(expectedExceptions = TxnReadOnlyException.class)
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/dgraph/MutatesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testInsert3Quads() {
uidsMap.put(datum, resp.getUidsOrThrow(datum));
}

txn.commit();
txn.commit(false);
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/dgraph/OpencensusJaegerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private static void runTransactions() {
.build();
Transaction txn = dgraphClient.newTransaction();
txn.mutate(mu);
txn.commit();
txn.commit(false);

String query = "{\n q(func: eq(name, \"Alice\")) {\n name\n uid\n}\n}";
dgraphClient.newTransaction().query(query);
Expand Down
Loading