Skip to content

Commit 150f4e7

Browse files
nikagradkropachev
authored andcommitted
fix: cap onRequestError retries at 1 in DefaultRetryPolicy
DefaultRetryPolicy.onRequestError() now retries on the next host only once (nbRetry == 0), then rethrows. Previously, it returned tryNextHost unconditionally, allowing retries across all nodes in the query plan. In large clusters, a single timed-out request could cascade through every node. With a 6-second client timeout and 12 nodes, this means a single thread could be blocked for up to 72 seconds, leading to thread pool exhaustion (RejectedExecutionException). The same fix is applied to DowngradingConsistencyRetryPolicy. This makes onRequestError consistent with onUnavailable, onReadTimeout, and onWriteTimeout, all of which cap retries at 1. Motivated-by: CUSTOMER-331
1 parent 734a430 commit 150f4e7

4 files changed

Lines changed: 220 additions & 116 deletions

File tree

driver-core/src/main/java/com/datastax/driver/core/policies/DefaultRetryPolicy.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
* <li>On a write timeout, retries once on the same host if we timeout while writing the
3535
* distributed log used by batch statements.
3636
* <li>On an unavailable exception, retries once on the next host.
37-
* <li>On a request error, such as a client timeout, the query is retried on the next host. Do not
38-
* retry on read or write failures.
37+
* <li>On a request error, such as a client timeout, retries once on the next host. Do not retry
38+
* on read or write failures.
3939
* </ul>
4040
*
4141
* <p>This retry policy is conservative in that it will never retry with a different consistency
@@ -136,16 +136,27 @@ public RetryDecision onUnavailable(
136136
return (nbRetry == 0) ? RetryDecision.tryNextHost(null) : RetryDecision.rethrow();
137137
}
138138

139-
/** {@inheritDoc} */
139+
/**
140+
* {@inheritDoc}
141+
*
142+
* <p>This implementation triggers a maximum of one retry on the next host in the query plan. The
143+
* rationale is that the first coordinator might have been network-isolated or overloaded, and
144+
* moving to the next host might resolve the issue. If the retry also fails, the exception is
145+
* rethrown.
146+
*
147+
* <p>Read and write failures are never retried, as they generally indicate a data problem that is
148+
* unlikely to be resolved by a retry.
149+
*
150+
* @return {@code RetryDecision.tryNextHost(cl)} if no retry attempt has yet been tried and the
151+
* error is not a read/write failure, {@code RetryDecision.rethrow()} otherwise.
152+
*/
140153
@Override
141154
public RetryDecision onRequestError(
142155
Statement statement, ConsistencyLevel cl, DriverException e, int nbRetry) {
143-
// do not retry these by default as they generally indicate a data problem or
144-
// other issue that is unlikely to be resolved by a retry.
145156
if (e instanceof WriteFailureException || e instanceof ReadFailureException) {
146157
return RetryDecision.rethrow();
147158
}
148-
return RetryDecision.tryNextHost(cl);
159+
return (nbRetry == 0) ? RetryDecision.tryNextHost(cl) : RetryDecision.rethrow();
149160
}
150161

151162
@Override

driver-core/src/main/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,21 +208,24 @@ public RetryDecision onUnavailable(
208208
/**
209209
* {@inheritDoc}
210210
*
211-
* <p>For historical reasons, this implementation triggers a retry on the next host in the query
212-
* plan with the same consistency level, regardless of the statement's idempotence. Note that this
213-
* breaks the general rule stated in {@link RetryPolicy#onRequestError(Statement,
214-
* ConsistencyLevel, DriverException, int)}: "a retry should only be attempted if the request is
215-
* known to be idempotent".`
211+
* <p>This implementation triggers a maximum of one retry on the next host in the query plan. The
212+
* rationale is that the first coordinator might have been network-isolated or overloaded, and
213+
* moving to the next host might resolve the issue. If the retry also fails, the exception is
214+
* rethrown.
215+
*
216+
* <p>Read and write failures are never retried, as they generally indicate a data problem that is
217+
* unlikely to be resolved by a retry.
218+
*
219+
* @return {@code RetryDecision.tryNextHost(cl)} if no retry attempt has yet been tried and the
220+
* error is not a read/write failure, {@code RetryDecision.rethrow()} otherwise.
216221
*/
217222
@Override
218223
public RetryDecision onRequestError(
219224
Statement statement, ConsistencyLevel cl, DriverException e, int nbRetry) {
220-
// do not retry these by default as they generally indicate a data problem or
221-
// other issue that is unlikely to be resolved by a retry.
222225
if (e instanceof WriteFailureException || e instanceof ReadFailureException) {
223226
return RetryDecision.rethrow();
224227
}
225-
return RetryDecision.tryNextHost(cl);
228+
return (nbRetry == 0) ? RetryDecision.tryNextHost(cl) : RetryDecision.rethrow();
226229
}
227230

228231
@Override

driver-core/src/test/java/com/datastax/driver/core/policies/DefaultRetryPolicyIntegrationTest.java

Lines changed: 94 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,38 @@ public void should_rethrow_unavailable_in_no_host_available_exception() {
179179
}
180180

181181
@Test(groups = "short")
182-
public void should_try_next_host_on_client_timeouts() {
182+
public void should_try_next_host_on_first_client_timeout() {
183+
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(1);
184+
try {
185+
scassandras
186+
.node(1)
187+
.primingClient()
188+
.prime(
189+
PrimingRequest.queryBuilder()
190+
.withQuery("mock query")
191+
.withThen(then().withFixedDelay(1000L).withRows(row("result", "result1")))
192+
.build());
193+
simulateNormalResponse(2);
194+
195+
query();
196+
197+
assertOnRequestErrorWasCalled(1, OperationTimedOutException.class);
198+
assertThat(errors.getRetries().getCount()).isEqualTo(1);
199+
assertThat(errors.getClientTimeouts().getCount()).isEqualTo(1);
200+
assertThat(errors.getRetriesOnClientTimeout().getCount()).isEqualTo(1);
201+
assertQueried(1, 1);
202+
assertQueried(2, 1);
203+
assertQueried(3, 0);
204+
} finally {
205+
cluster
206+
.getConfiguration()
207+
.getSocketOptions()
208+
.setReadTimeoutMillis(SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS);
209+
}
210+
}
211+
212+
@Test(groups = "short")
213+
public void should_rethrow_on_second_client_timeout() {
183214
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(1);
184215
try {
185216
scassandras
@@ -208,32 +239,17 @@ public void should_try_next_host_on_client_timeouts() {
208239
.build());
209240
try {
210241
query();
211-
fail("expected a NoHostAvailableException");
212-
} catch (NoHostAvailableException e) {
213-
assertThat(e.getErrors().keySet())
214-
.hasSize(3)
215-
.containsOnly(host1.getEndPoint(), host2.getEndPoint(), host3.getEndPoint());
216-
assertThat(e.getErrors().values()).hasOnlyElementsOfType(OperationTimedOutException.class);
217-
assertThat(
218-
((OperationTimedOutException) e.getErrors().get(host1.getEndPoint())).getMessage())
219-
.contains(
220-
String.format("[%s] Timed out waiting for server response", host1.getEndPoint()));
221-
assertThat(
222-
((OperationTimedOutException) e.getErrors().get(host2.getEndPoint())).getMessage())
223-
.contains(
224-
String.format("[%s] Timed out waiting for server response", host2.getEndPoint()));
225-
assertThat(
226-
((OperationTimedOutException) e.getErrors().get(host3.getEndPoint())).getMessage())
227-
.contains(
228-
String.format("[%s] Timed out waiting for server response", host3.getEndPoint()));
242+
fail("expected an OperationTimedOutException");
243+
} catch (OperationTimedOutException e) {
244+
assertThat(e.getMessage()).contains("Timed out waiting for server response");
229245
}
230-
assertOnRequestErrorWasCalled(3, OperationTimedOutException.class);
231-
assertThat(errors.getRetries().getCount()).isEqualTo(3);
232-
assertThat(errors.getClientTimeouts().getCount()).isEqualTo(3);
233-
assertThat(errors.getRetriesOnClientTimeout().getCount()).isEqualTo(3);
246+
assertOnRequestErrorWasCalled(2, OperationTimedOutException.class);
247+
assertThat(errors.getRetries().getCount()).isEqualTo(1);
248+
assertThat(errors.getClientTimeouts().getCount()).isEqualTo(2);
249+
assertThat(errors.getRetriesOnClientTimeout().getCount()).isEqualTo(1);
234250
assertQueried(1, 1);
235251
assertQueried(2, 1);
236-
assertQueried(3, 1);
252+
assertQueried(3, 0);
237253
} finally {
238254
cluster
239255
.getConfiguration()
@@ -243,27 +259,41 @@ public void should_try_next_host_on_client_timeouts() {
243259
}
244260

245261
@Test(groups = "short", dataProvider = "serverSideErrors")
246-
public void should_try_next_host_on_server_side_error(
262+
public void should_try_next_host_on_first_server_side_error(
263+
Result error, Class<? extends DriverException> exception) {
264+
simulateError(1, error);
265+
simulateNormalResponse(2);
266+
267+
query();
268+
269+
assertOnRequestErrorWasCalled(1, exception);
270+
assertThat(errors.getOthers().getCount()).isEqualTo(1);
271+
assertThat(errors.getRetries().getCount()).isEqualTo(1);
272+
assertThat(errors.getRetriesOnOtherErrors().getCount()).isEqualTo(1);
273+
assertQueried(1, 1);
274+
assertQueried(2, 1);
275+
assertQueried(3, 0);
276+
}
277+
278+
@Test(groups = "short", dataProvider = "serverSideErrors")
279+
public void should_rethrow_on_second_server_side_error(
247280
Result error, Class<? extends DriverException> exception) {
248281
simulateError(1, error);
249282
simulateError(2, error);
250283
simulateError(3, error);
251284
try {
252285
query();
253-
Fail.fail("expected a NoHostAvailableException");
254-
} catch (NoHostAvailableException e) {
255-
assertThat(e.getErrors().keySet())
256-
.hasSize(3)
257-
.containsOnly(host1.getEndPoint(), host2.getEndPoint(), host3.getEndPoint());
258-
assertThat(e.getErrors().values()).hasOnlyElementsOfType(exception);
286+
Fail.fail("expected a " + exception.getSimpleName());
287+
} catch (DriverException e) {
288+
assertThat(e).isInstanceOf(exception);
259289
}
260-
assertOnRequestErrorWasCalled(3, exception);
261-
assertThat(errors.getOthers().getCount()).isEqualTo(3);
262-
assertThat(errors.getRetries().getCount()).isEqualTo(3);
263-
assertThat(errors.getRetriesOnOtherErrors().getCount()).isEqualTo(3);
290+
assertOnRequestErrorWasCalled(2, exception);
291+
assertThat(errors.getOthers().getCount()).isEqualTo(2);
292+
assertThat(errors.getRetries().getCount()).isEqualTo(1);
293+
assertThat(errors.getRetriesOnOtherErrors().getCount()).isEqualTo(1);
264294
assertQueried(1, 1);
265295
assertQueried(2, 1);
266-
assertQueried(3, 1);
296+
assertQueried(3, 0);
267297
}
268298

269299
@Test(groups = "short")
@@ -307,27 +337,43 @@ public void should_rethrow_on_write_failure() {
307337
}
308338

309339
@Test(groups = "short", dataProvider = "connectionErrors")
310-
public void should_try_next_host_on_connection_error(ClosedConnectionConfig.CloseType closeType) {
340+
public void should_try_next_host_on_first_connection_error(
341+
ClosedConnectionConfig.CloseType closeType) {
342+
simulateError(1, closed_connection, new ClosedConnectionConfig(closeType));
343+
simulateNormalResponse(2);
344+
345+
query();
346+
347+
assertOnRequestErrorWasCalled(1, TransportException.class);
348+
assertThat(errors.getRetries().getCount()).isEqualTo(1);
349+
assertThat(errors.getConnectionErrors().getCount()).isEqualTo(1);
350+
assertThat(errors.getIgnoresOnConnectionError().getCount()).isEqualTo(0);
351+
assertThat(errors.getRetriesOnConnectionError().getCount()).isEqualTo(1);
352+
assertQueried(1, 1);
353+
assertQueried(2, 1);
354+
assertQueried(3, 0);
355+
}
356+
357+
@Test(groups = "short", dataProvider = "connectionErrors")
358+
public void should_rethrow_on_second_connection_error(
359+
ClosedConnectionConfig.CloseType closeType) {
311360
simulateError(1, closed_connection, new ClosedConnectionConfig(closeType));
312361
simulateError(2, closed_connection, new ClosedConnectionConfig(closeType));
313362
simulateError(3, closed_connection, new ClosedConnectionConfig(closeType));
314363
try {
315364
query();
316-
Fail.fail("expected a NoHostAvailableException");
317-
} catch (NoHostAvailableException e) {
318-
assertThat(e.getErrors().keySet())
319-
.hasSize(3)
320-
.containsOnly(host1.getEndPoint(), host2.getEndPoint(), host3.getEndPoint());
321-
assertThat(e.getErrors().values()).hasOnlyElementsOfType(TransportException.class);
365+
Fail.fail("expected a TransportException");
366+
} catch (TransportException e) {
367+
// expected — rethrown after one retry
322368
}
323-
assertOnRequestErrorWasCalled(3, TransportException.class);
324-
assertThat(errors.getRetries().getCount()).isEqualTo(3);
325-
assertThat(errors.getConnectionErrors().getCount()).isEqualTo(3);
369+
assertOnRequestErrorWasCalled(2, TransportException.class);
370+
assertThat(errors.getRetries().getCount()).isEqualTo(1);
371+
assertThat(errors.getConnectionErrors().getCount()).isEqualTo(2);
326372
assertThat(errors.getIgnoresOnConnectionError().getCount()).isEqualTo(0);
327-
assertThat(errors.getRetriesOnConnectionError().getCount()).isEqualTo(3);
373+
assertThat(errors.getRetriesOnConnectionError().getCount()).isEqualTo(1);
328374
assertQueried(1, 1);
329375
assertQueried(2, 1);
330-
assertQueried(3, 1);
376+
assertQueried(3, 0);
331377
}
332378

333379
@Test(groups = "short")

0 commit comments

Comments
 (0)