|
28 | 28 | import static org.mockito.Mockito.verify;
|
29 | 29 | import static org.mockito.Mockito.when;
|
30 | 30 |
|
| 31 | +import java.time.Duration; |
31 | 32 | import java.util.concurrent.BlockingQueue;
|
32 | 33 | import java.util.concurrent.CancellationException;
|
| 34 | +import java.util.concurrent.CompletableFuture; |
33 | 35 | import java.util.concurrent.Executors;
|
34 | 36 | import java.util.concurrent.LinkedTransferQueue;
|
35 | 37 | import java.util.concurrent.TimeUnit;
|
|
39 | 41 | import org.apache.thrift.TApplicationException;
|
40 | 42 | import org.junit.jupiter.api.Test;
|
41 | 43 | import org.junit.jupiter.api.extension.RegisterExtension;
|
| 44 | +import org.junit.jupiter.params.ParameterizedTest; |
| 45 | +import org.junit.jupiter.params.provider.EnumSource; |
42 | 46 |
|
43 | 47 | import com.linecorp.armeria.client.ClientFactory;
|
44 | 48 | import com.linecorp.armeria.client.ClientRequestContext;
|
| 49 | +import com.linecorp.armeria.client.ResponseCancellationException; |
45 | 50 | import com.linecorp.armeria.client.UnprocessedRequestException;
|
46 | 51 | import com.linecorp.armeria.client.retry.Backoff;
|
47 | 52 | import com.linecorp.armeria.client.retry.RetryConfig;
|
|
53 | 58 | import com.linecorp.armeria.common.HttpRequest;
|
54 | 59 | import com.linecorp.armeria.common.RpcResponse;
|
55 | 60 | import com.linecorp.armeria.common.logging.RequestLog;
|
| 61 | +import com.linecorp.armeria.common.util.TimeoutMode; |
56 | 62 | import com.linecorp.armeria.common.util.UnmodifiableFuture;
|
57 | 63 | import com.linecorp.armeria.server.ServerBuilder;
|
58 | 64 | import com.linecorp.armeria.server.thrift.THttpService;
|
@@ -326,34 +332,136 @@ void shouldGetExceptionWhenFactoryIsClosed() throws Exception {
|
326 | 332 | "(?i).*(factory has been closed|not accepting a task).*"));
|
327 | 333 | }
|
328 | 334 |
|
329 |
| - @Test |
330 |
| - void doNotRetryWhenResponseIsCancelled() throws Exception { |
| 335 | + enum DoNotRetryWhenResponseIsCancelledTestParams { |
| 336 | + // Cancel delays for a backoff of 50 milliseconds (quickBackoffMillis). |
| 337 | + CANCEL_CTX_FIRST_REQUEST_NO_DELAY(true, true, 0), |
| 338 | + CANCEL_CTX_FIRST_REQUEST_WITH_DELAY(true, true, 500), |
| 339 | + CANCEL_CTX_AFTER_FIRST_REQUEST_NO_DELAY(false, true, 0), |
| 340 | + CANCEL_CTX_AFTER_FIRST_REQUEST_WITH_DELAY(false, false, 500), |
| 341 | + CANCEL_RES_AFTER_FIRST_REQUEST_NO_DELAY(false, true, 0), |
| 342 | + CANCEL_RES_AFTER_FIRST_REQUEST_WITH_DELAY(false, false, 500); |
| 343 | + |
| 344 | + static final int BACKOFF_MILLIS = 50; |
| 345 | + final boolean ensureCancelBeforeFirstRequest; |
| 346 | + final boolean cancelViaCtx; |
| 347 | + final long cancelDelayMillis; |
| 348 | + |
| 349 | + DoNotRetryWhenResponseIsCancelledTestParams(boolean ensureCancelBeforeFirstRequest, |
| 350 | + boolean cancelViaCtx, |
| 351 | + long cancelDelayMillis) { |
| 352 | + this.ensureCancelBeforeFirstRequest = ensureCancelBeforeFirstRequest; |
| 353 | + if (this.ensureCancelBeforeFirstRequest) { |
| 354 | + this.cancelViaCtx = true; |
| 355 | + } else { |
| 356 | + this.cancelViaCtx = cancelViaCtx; |
| 357 | + } |
| 358 | + |
| 359 | + this.cancelDelayMillis = cancelDelayMillis; |
| 360 | + } |
| 361 | + } |
| 362 | + |
| 363 | + @ParameterizedTest |
| 364 | + @EnumSource(DoNotRetryWhenResponseIsCancelledTestParams.class) |
| 365 | + void doNotRetryWhenResponseIsCancelled(DoNotRetryWhenResponseIsCancelledTestParams param) throws Exception { |
331 | 366 | serviceRetryCount.set(0);
|
| 367 | + |
| 368 | + final RetryRuleWithContent<RpcResponse> quickRetryAlways = |
| 369 | + RetryRuleWithContent.<RpcResponse>builder() |
| 370 | + .onException() |
| 371 | + .thenBackoff(Backoff.fixed( |
| 372 | + DoNotRetryWhenResponseIsCancelledTestParams.BACKOFF_MILLIS)); |
| 373 | + |
| 374 | + final int maxExpectedAttempts = |
| 375 | + (int) (param.cancelDelayMillis / DoNotRetryWhenResponseIsCancelledTestParams.BACKOFF_MILLIS) + |
| 376 | + 5; |
| 377 | + final AtomicInteger serviceRetryCountWhenCancelled = new AtomicInteger(); |
332 | 378 | try (ClientFactory factory = ClientFactory.builder().build()) {
|
333 | 379 | final AtomicReference<ClientRequestContext> context = new AtomicReference<>();
|
334 | 380 | final HelloService.Iface client =
|
335 | 381 | ThriftClients.builder(server.httpUri())
|
336 | 382 | .path("/thrift")
|
337 | 383 | .factory(factory)
|
338 |
| - .rpcDecorator(RetryingRpcClient.builder(retryAlways).newDecorator()) |
| 384 | + .rpcDecorator(RetryingRpcClient.builder(quickRetryAlways) |
| 385 | + // We want to cancel the request before |
| 386 | + // we quit because of reaching max attempts. |
| 387 | + .maxTotalAttempts(maxExpectedAttempts) |
| 388 | + .newDecorator()) |
339 | 389 | .rpcDecorator((delegate, ctx, req) -> {
|
340 |
| - context.set(ctx); |
341 |
| - final RpcResponse res = delegate.execute(ctx, req); |
342 |
| - res.cancel(true); |
| 390 | + final CompletableFuture<RpcResponse> resFuture = new CompletableFuture<>(); |
| 391 | + final RpcResponse res = RpcResponse.from(resFuture); |
| 392 | + |
| 393 | + if (param.ensureCancelBeforeFirstRequest) { |
| 394 | + Thread.sleep(param.cancelDelayMillis); |
| 395 | + |
| 396 | + assertThat(ctx.isCancelled()).isFalse(); |
| 397 | + assertThat(res.isDone()).isFalse(); |
| 398 | + assert param.cancelViaCtx; |
| 399 | + ctx.cancel(); |
| 400 | + serviceRetryCountWhenCancelled.set(serviceRetryCount.get()); |
| 401 | + |
| 402 | + resFuture.complete(delegate.execute(ctx, req)); |
| 403 | + } else { |
| 404 | + final RpcResponse nextRes = delegate.execute(ctx, req); |
| 405 | + resFuture.complete(nextRes); |
| 406 | + |
| 407 | + Thread.sleep(param.cancelDelayMillis); |
| 408 | + |
| 409 | + assertThat(ctx.isCancelled()).isFalse(); |
| 410 | + assertThat(nextRes.isDone()).isFalse(); |
| 411 | + |
| 412 | + if (param.cancelViaCtx) { |
| 413 | + ctx.cancel(); |
| 414 | + } else { |
| 415 | + nextRes.cancel(true); |
| 416 | + } |
| 417 | + |
| 418 | + serviceRetryCountWhenCancelled.set(serviceRetryCount.get()); |
| 419 | + } |
| 420 | + |
343 | 421 | return res;
|
344 | 422 | })
|
| 423 | + .rpcDecorator((delegate, ctx, req) -> { |
| 424 | + context.set(ctx); |
| 425 | + // Make sure we do not get cancelled while delaying the cancel. |
| 426 | + ctx.setResponseTimeout( |
| 427 | + TimeoutMode.EXTEND, |
| 428 | + Duration.ofMillis(param.cancelDelayMillis + 5000) |
| 429 | + ); |
| 430 | + |
| 431 | + return delegate.execute(ctx, req); |
| 432 | + }) |
345 | 433 | .build(HelloService.Iface.class);
|
346 | 434 | when(serviceHandler.hello(anyString())).thenThrow(new IllegalArgumentException());
|
347 | 435 |
|
348 |
| - assertThatThrownBy(() -> client.hello("hello")).isInstanceOf(CancellationException.class); |
| 436 | + assertThatThrownBy(() -> client.hello("hello")) |
| 437 | + .isOfAnyClassIn(CancellationException.class, ResponseCancellationException.class); |
349 | 438 |
|
350 | 439 | await().untilAsserted(() -> {
|
351 |
| - verify(serviceHandler, only()).hello("hello"); |
| 440 | + assertThat(serviceRetryCountWhenCancelled.get()).isIn(serviceRetryCount.get(), |
| 441 | + serviceRetryCount.get() - 1); |
| 442 | + verify(serviceHandler, times(serviceRetryCount.get())).hello("hello"); |
352 | 443 | });
|
| 444 | + |
| 445 | + final RequestLog log = context.get().log().whenComplete().join(); |
| 446 | + if (param.ensureCancelBeforeFirstRequest) { |
| 447 | + assertThat(serviceRetryCount.get()).isZero(); |
| 448 | + assertThat(log.requestCause()).isExactlyInstanceOf(ResponseCancellationException.class); |
| 449 | + assertThat(log.responseCause()).isExactlyInstanceOf(ResponseCancellationException.class); |
| 450 | + } else { |
| 451 | + // We still could cancel the before the first request so we do not have a guarantee for |
| 452 | + // requestCause() to be null. |
| 453 | + assertThat(log.responseCause()).isOfAnyClassIn(ResponseCancellationException.class, |
| 454 | + CancellationException.class); |
| 455 | + } |
| 456 | + |
353 | 457 | // Sleep 1 second more to check if there was another retry.
|
354 | 458 | TimeUnit.SECONDS.sleep(1);
|
355 |
| - verify(serviceHandler, only()).hello("hello"); |
356 |
| - assertThat(serviceRetryCount).hasValue(1); |
| 459 | + if (param.ensureCancelBeforeFirstRequest) { |
| 460 | + assertThat(serviceRetryCount.get()).isZero(); |
| 461 | + } |
| 462 | + assertThat(serviceRetryCountWhenCancelled.get()).isIn(serviceRetryCount.get(), |
| 463 | + serviceRetryCount.get() - 1); |
| 464 | + verify(serviceHandler, times(serviceRetryCount.get())).hello("hello"); |
357 | 465 | }
|
358 | 466 | }
|
359 | 467 | }
|
0 commit comments