From a509f46dbd49bcbbc416a70287f4db279222e53a Mon Sep 17 00:00:00 2001 From: hyunw9 Date: Mon, 31 Mar 2025 22:56:12 +0900 Subject: [PATCH 1/6] feat : WaitUntilEndpointGroupInitialized --- .../HealthCheckedEndpointGroupTest.java | 101 +++++++++--------- 1 file changed, 52 insertions(+), 49 deletions(-) diff --git a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java index f6ee0f93b2f..126dbab63ad 100644 --- a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java @@ -537,8 +537,8 @@ void cacheReflectsAttributeChanges() throws InterruptedException { final Function checkFactory = ctx -> { final EventLoopGroup executors = CommonPools.workerGroup(); final ScheduledFuture scheduledFuture = executors.scheduleAtFixedRate( - () -> ctx.updateHealth(healthy.get(), null, headers.get(), null), - 0, 100, TimeUnit.MILLISECONDS); + () -> ctx.updateHealth(healthy.get(), null, headers.get(), null), + 0, 100, TimeUnit.MILLISECONDS); return AsyncCloseableSupport.of(f -> { scheduledFuture.cancel(true); f.complete(null); @@ -553,53 +553,56 @@ void cacheReflectsAttributeChanges() throws InterruptedException { final AtomicLong updateInvokedCounter = new AtomicLong(); final Consumer> endpointsListener = endpoints -> updateInvokedCounter.incrementAndGet(); - try (HealthCheckedEndpointGroup endpointGroup = - new HealthCheckedEndpointGroup(delegate, true, - 10000, 10000, - SessionProtocol.HTTP, 80, - DEFAULT_HEALTH_CHECK_RETRY_BACKOFF, - ClientOptions.of(), checkFactory, - HealthCheckStrategy.all(), - DEFAULT_ENDPOINT_PREDICATE)) { - endpointGroup.addListener(endpointsListener, true); - await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(1)); - // the counter should stay 1 after 1 second has passed - await().pollDelay(1, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(1)); - assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) - .isFalse(); - assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) - .isTrue(); - - headers.set(ResponseHeaders.of(HttpStatus.OK, "x-envoy-degraded", "")); - // the counter should be incremented to three now - await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(2)); - assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) - .isTrue(); - assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) - .isTrue(); - - // the counter should be incremented to two now - healthy.set(0); - await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(3)); - assertThat(endpointGroup.endpoints()).isEmpty(); - - // healthy again - healthy.set(1); - await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(4)); - assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) - .isTrue(); - assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) - .isTrue(); - - // turn off degraded again - headers.set(null); - await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(5)); - assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) - .isTrue(); - assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) - .isFalse(); - } + HealthCheckedEndpointGroup endpointGroup = new HealthCheckedEndpointGroup( + delegate, true, + 10000, 10000, + SessionProtocol.HTTP, 80, + DEFAULT_HEALTH_CHECK_RETRY_BACKOFF, + ClientOptions.of(), checkFactory, + HealthCheckStrategy.all(), + DEFAULT_ENDPOINT_PREDICATE + ); + + endpointGroup.whenReady().join(); + endpointGroup.addListener(endpointsListener, true); + + await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(1)); + // the counter should stay 1 after 1 second has passed + await().pollDelay(1, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(1)); + assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) + .isFalse(); + assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) + .isTrue(); + + headers.set(ResponseHeaders.of(HttpStatus.OK, "x-envoy-degraded", "")); + // the counter should be incremented to three now + await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(2)); + assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) + .isTrue(); + assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) + .isTrue(); + + // the counter should be incremented to two now + healthy.set(0); + await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(3)); + assertThat(endpointGroup.endpoints()).isEmpty(); + + // healthy again + healthy.set(1); + await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(4)); + assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) + .isTrue(); + assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) + .isTrue(); + + // turn off degraded again + headers.set(null); + await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(5)); + assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) + .isTrue(); + assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) + .isFalse(); } @Test From b92de357e36a6aa370e830ff57f46a49c6cc75d5 Mon Sep 17 00:00:00 2001 From: hyunw9 Date: Mon, 31 Mar 2025 23:10:54 +0900 Subject: [PATCH 2/6] feat : change EndpointGroup to Final --- .../endpoint/healthcheck/HealthCheckedEndpointGroupTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java index 126dbab63ad..e40e1423bf8 100644 --- a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java @@ -553,7 +553,7 @@ void cacheReflectsAttributeChanges() throws InterruptedException { final AtomicLong updateInvokedCounter = new AtomicLong(); final Consumer> endpointsListener = endpoints -> updateInvokedCounter.incrementAndGet(); - HealthCheckedEndpointGroup endpointGroup = new HealthCheckedEndpointGroup( + final HealthCheckedEndpointGroup endpointGroup = new HealthCheckedEndpointGroup( delegate, true, 10000, 10000, SessionProtocol.HTTP, 80, From b7e731dcdc56cb53ced6c368f905d43e7ae07ac2 Mon Sep 17 00:00:00 2001 From: hyunw9 Date: Tue, 1 Apr 2025 21:36:23 +0900 Subject: [PATCH 3/6] fix : correct code style --- .../HealthCheckedEndpointGroupTest.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java index e40e1423bf8..63434237da9 100644 --- a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java @@ -537,8 +537,8 @@ void cacheReflectsAttributeChanges() throws InterruptedException { final Function checkFactory = ctx -> { final EventLoopGroup executors = CommonPools.workerGroup(); final ScheduledFuture scheduledFuture = executors.scheduleAtFixedRate( - () -> ctx.updateHealth(healthy.get(), null, headers.get(), null), - 0, 100, TimeUnit.MILLISECONDS); + () -> ctx.updateHealth(healthy.get(), null, headers.get(), null), + 0, 100, TimeUnit.MILLISECONDS); return AsyncCloseableSupport.of(f -> { scheduledFuture.cancel(true); f.complete(null); @@ -554,13 +554,13 @@ void cacheReflectsAttributeChanges() throws InterruptedException { final Consumer> endpointsListener = endpoints -> updateInvokedCounter.incrementAndGet(); final HealthCheckedEndpointGroup endpointGroup = new HealthCheckedEndpointGroup( - delegate, true, - 10000, 10000, - SessionProtocol.HTTP, 80, - DEFAULT_HEALTH_CHECK_RETRY_BACKOFF, - ClientOptions.of(), checkFactory, - HealthCheckStrategy.all(), - DEFAULT_ENDPOINT_PREDICATE + delegate, true, + 10000, 10000, + SessionProtocol.HTTP, 80, + DEFAULT_HEALTH_CHECK_RETRY_BACKOFF, + ClientOptions.of(), checkFactory, + HealthCheckStrategy.all(), + DEFAULT_ENDPOINT_PREDICATE ); endpointGroup.whenReady().join(); @@ -569,19 +569,19 @@ void cacheReflectsAttributeChanges() throws InterruptedException { await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(1)); // the counter should stay 1 after 1 second has passed await().pollDelay(1, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(1)); + .untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(1)); assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) - .isFalse(); + .isFalse(); assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) - .isTrue(); + .isTrue(); headers.set(ResponseHeaders.of(HttpStatus.OK, "x-envoy-degraded", "")); // the counter should be incremented to three now await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(2)); assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) - .isTrue(); + .isTrue(); assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) - .isTrue(); + .isTrue(); // the counter should be incremented to two now healthy.set(0); @@ -592,17 +592,17 @@ void cacheReflectsAttributeChanges() throws InterruptedException { healthy.set(1); await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(4)); assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) - .isTrue(); + .isTrue(); assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) - .isTrue(); + .isTrue(); // turn off degraded again headers.set(null); await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(5)); assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.HEALTHY_ATTR)) - .isTrue(); + .isTrue(); assertThat(endpointGroup.endpoints().get(0).attrs().attr(EndpointAttributeKeys.DEGRADED_ATTR)) - .isFalse(); + .isFalse(); } @Test From 018a7b1c6abd362d79807bc7b97a4a91b096f738 Mon Sep 17 00:00:00 2001 From: hyunw9 Date: Tue, 1 Apr 2025 21:36:57 +0900 Subject: [PATCH 4/6] feat : addListener with notifyLatestValue false --- .../endpoint/healthcheck/HealthCheckedEndpointGroupTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java index 63434237da9..776629da585 100644 --- a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java @@ -564,7 +564,7 @@ void cacheReflectsAttributeChanges() throws InterruptedException { ); endpointGroup.whenReady().join(); - endpointGroup.addListener(endpointsListener, true); + endpointGroup.addListener(endpointsListener, false); await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(1)); // the counter should stay 1 after 1 second has passed From 31cbb14dc61427c86fce92852eb12ddf0e796f78 Mon Sep 17 00:00:00 2001 From: hyunw9 Date: Tue, 1 Apr 2025 21:52:38 +0900 Subject: [PATCH 5/6] feat : accept endpointsListener manually --- .../endpoint/healthcheck/HealthCheckedEndpointGroupTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java index 776629da585..dd63525efe8 100644 --- a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java @@ -565,6 +565,7 @@ void cacheReflectsAttributeChanges() throws InterruptedException { endpointGroup.whenReady().join(); endpointGroup.addListener(endpointsListener, false); + endpointsListener.accept(endpointGroup.endpoints()); await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(1)); // the counter should stay 1 after 1 second has passed From d64cad067bc31e507626436ca62628a021a69112 Mon Sep 17 00:00:00 2001 From: hyunw9 Date: Tue, 22 Apr 2025 20:54:59 +0900 Subject: [PATCH 6/6] feat : accept endpointsListener manually --- .../endpoint/healthcheck/HealthCheckedEndpointGroupTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java index dd63525efe8..2633e497eb8 100644 --- a/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupTest.java @@ -563,9 +563,7 @@ void cacheReflectsAttributeChanges() throws InterruptedException { DEFAULT_ENDPOINT_PREDICATE ); - endpointGroup.whenReady().join(); - endpointGroup.addListener(endpointsListener, false); - endpointsListener.accept(endpointGroup.endpoints()); + endpointGroup.addListener(endpointsListener, true); await().untilAsserted(() -> assertThat(updateInvokedCounter).hasValue(1)); // the counter should stay 1 after 1 second has passed