diff --git a/CHANGELOG.md b/CHANGELOG.md index 241f8ca9c2..81c0fe0b1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.81.1] - 2025-11-04 +- Fix xds client active wait time metric also add more logs. + ## [29.81.0] - 2025-11-03 - Indis based d2 warmup for restli client @@ -5930,7 +5933,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.81.0...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.81.1...master +[29.81.1]: https://github.com/linkedin/rest.li/compare/v29.81.0...v29.81.1 [29.81.0]: https://github.com/linkedin/rest.li/compare/v29.80.3...v29.81.0 [29.80.3]: https://github.com/linkedin/rest.li/compare/v29.80.2...v29.80.3 [29.80.2]: https://github.com/linkedin/rest.li/compare/v29.80.1...v29.80.2 diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 627ab83ea3..e4faa58326 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -57,6 +57,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -68,6 +69,8 @@ import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.*; +import static com.linkedin.d2.xds.XdsClientImpl.SubscriberFetchState.*; + /** * Implementation of {@link XdsClient} interface. @@ -254,7 +257,7 @@ public void watchXdsResource(String resourceName, ResourceWatcher watcher) ResourceSubscriber subscriber = resourceSubscriberMap.get(resourceName); if (subscriber == null) { - subscriber = new ResourceSubscriber(originalType, resourceName, _xdsClientJmx); + subscriber = new ResourceSubscriber(originalType, resourceName, _xdsClientJmx, _initialResourceVersionsEnabled); resourceSubscriberMap.put(resourceName, subscriber); ResourceType adjustedType; String adjustedResourceName; @@ -292,7 +295,7 @@ public void watchAllXdsResources(WildcardResourceWatcher watcher) WildcardResourceSubscriber subscriber = getWildcardResourceSubscriber(originalType); if (subscriber == null) { - subscriber = new WildcardResourceSubscriber(originalType); + subscriber = new WildcardResourceSubscriber(originalType, _initialResourceVersionsEnabled); getWildcardResourceSubscribers().put(originalType, subscriber); ResourceType adjustedType = shouldSubscribeUriGlobCollection(originalType) ? ResourceType.D2_URI : originalType; @@ -675,7 +678,7 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) || uriSubscriber.getData() == null // The URI was corrupted and there was no previous version of this URI ) { - uriSubscriber.onData(new D2URIUpdate(uri), _serverMetricsProvider, _initialResourceVersionsEnabled); + uriSubscriber.onData(new D2URIUpdate(uri), _serverMetricsProvider); } } @@ -791,12 +794,12 @@ private void handleResourceUpdate(Map updates, ResourceSubscriber subscriber = subscribers.get(entry.getKey()); if (subscriber != null) { - subscriber.onData(entry.getValue(), _serverMetricsProvider, _initialResourceVersionsEnabled); + subscriber.onData(entry.getValue(), _serverMetricsProvider); } if (wildcardSubscriber != null) { - wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider, _initialResourceVersionsEnabled); + wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider); } } } @@ -926,7 +929,7 @@ Map getWildcardResourceSubscribers() return _wildcardSubscribers; } - private enum SubscriberFetchState + enum SubscriberFetchState { INIT_PENDING, // the very first fetch since the subscriber is created PENDING_AFTER_RECONNECT, // the first fetch after each reconnect @@ -942,22 +945,24 @@ static class ResourceSubscriber @Nullable private ResourceUpdate _data; private final Clock _clock; - private long _subscribedAt; - private SubscriberFetchState _fetchState = SubscriberFetchState.INIT_PENDING; + private AtomicLong _subscribedAt = new AtomicLong(0L); + private AtomicReference _fetchState = new AtomicReference<>(SubscriberFetchState.INIT_PENDING); + private final boolean _isIrvEnabled; - ResourceSubscriber(ResourceType type, String resource, XdsClientJmx xdsClientJmx) + ResourceSubscriber(ResourceType type, String resource, XdsClientJmx xdsClientJmx, boolean isIrvEnabled) { - this(type, resource, xdsClientJmx, SystemClock.instance()); + this(type, resource, xdsClientJmx, isIrvEnabled, SystemClock.instance()); } @VisibleForTesting - ResourceSubscriber(ResourceType type, String resource, XdsClientJmx xdsClientJmx, Clock clock) + ResourceSubscriber(ResourceType type, String resource, XdsClientJmx xdsClientJmx, boolean isIrvEnabled, Clock clock) { _type = type; _resource = resource; _xdsClientJmx = xdsClientJmx; _clock = clock; - _subscribedAt = _clock.currentTimeMillis(); + _isIrvEnabled = isIrvEnabled; + _subscribedAt.set(_clock.currentTimeMillis()); } @VisibleForTesting @@ -973,6 +978,12 @@ public void setData(@Nullable ResourceUpdate data) _data = data; } + @VisibleForTesting + SubscriberFetchState getFetchState() + { + return _fetchState.get(); + } + void addWatcher(ResourceWatcher watcher) { _watchers.add(watcher); @@ -984,21 +995,30 @@ void addWatcher(ResourceWatcher watcher) } @VisibleForTesting - void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider, boolean isIrvEnabled) + void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider) { + SubscriberFetchState prev = _fetchState.getAndSet(FETCHED); + if (!FETCHED.equals(prev)) + { + _log.info("Received initial data for {} {}. Set state to FETCHED.", _type, _resource); + } + if (Objects.equals(_data, data)) { - _log.debug("Received resource update data equal to the current data. Will not perform the update."); + if (!FETCHED.equals(prev) && data != null && data.isValid()) + { + // Even though the data is the same, the subscriber is waiting for init data after either startup + // or a reconnection, so we need to track latency. + trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev); + } + _log.debug("Received resource update data equal to the current data. Will not perform any update."); return; } - SubscriberFetchState curFetchState = _fetchState; - _fetchState = SubscriberFetchState.FETCHED; - // null value guard to avoid overwriting the property with null if (data != null && data.isValid()) { - trackServerLatency(data, _data, metricsProvider, _subscribedAt, isIrvEnabled, curFetchState); // data updated, track xds server latency + trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev); _data = data; } else @@ -1067,7 +1087,11 @@ void onReconnect() @VisibleForTesting void onRemoval() { - _fetchState = SubscriberFetchState.FETCHED; + SubscriberFetchState prev = _fetchState.getAndSet(FETCHED); + if (!FETCHED.equals(prev)) + { + _log.info("Received data removed for {} {}. Set state to FETCHED.", _type, _resource); + } if (_data == null) { @@ -1082,23 +1106,34 @@ void onRemoval() private void reset() { - _subscribedAt = _clock.currentTimeMillis(); - _fetchState = SubscriberFetchState.PENDING_AFTER_RECONNECT; + // only change state to PENDING_AFTER_RECONNECT if the previous state was FETCHED. + // if the previous state was still init pending, no change to the state and subscribed time + if (_fetchState.compareAndSet(FETCHED, SubscriberFetchState.PENDING_AFTER_RECONNECT)) + { + _subscribedAt.set(_clock.currentTimeMillis()); + _log.info("Reset fetch state to PENDING_AFTER_RECONNECT for {} {} after reconnect.", _type, _resource); + } } @VisibleForTesting void setSubscribedAt(long subscribedAt) { - _subscribedAt = subscribedAt; + _subscribedAt.set(subscribedAt); } long getActiveInitialWaitTimeMillis(long end) { - if (SubscriberFetchState.FETCHED.equals(_fetchState)) + SubscriberFetchState cur = _fetchState.get(); + // if either 1) already fetched data, or 2) IRV is enabled and no data is received after reconnect, which means + // the server has no new data to send, then no active wait time. + if (FETCHED.equals(cur) || (_isIrvEnabled && PENDING_AFTER_RECONNECT.equals(cur))) { return 0; } - return end - _subscribedAt; + long past = end - _subscribedAt.get(); + // the wait time metric is collected every minute, so it's rate limited already + _log.info("Waiting for initial data of {} {}, past ms: {}.", _type, _resource, past); + return past; } } @@ -1108,19 +1143,21 @@ static class WildcardResourceSubscriber private final Set _watchers = new HashSet<>(); private final Map _data = new HashMap<>(); private final Clock _clock; - private long _subscribedAt; - private SubscriberFetchState _fetchState = SubscriberFetchState.INIT_PENDING; + private AtomicLong _subscribedAt = new AtomicLong(0L); + private AtomicReference _fetchState = new AtomicReference<>(SubscriberFetchState.INIT_PENDING); + private final boolean _isIrvEnabled; - WildcardResourceSubscriber(ResourceType type) + WildcardResourceSubscriber(ResourceType type, boolean isIrvEnabled) { - this(type, SystemClock.instance()); + this(type, isIrvEnabled, SystemClock.instance()); } - WildcardResourceSubscriber(ResourceType type, Clock clock) + WildcardResourceSubscriber(ResourceType type, boolean isIrvEnabled, Clock clock) { _type = type; _clock = clock; - _subscribedAt = _clock.currentTimeMillis(); + _isIrvEnabled = isIrvEnabled; + _subscribedAt.set(_clock.currentTimeMillis()); } @VisibleForTesting @@ -1135,6 +1172,12 @@ public void setData(String resourceName, ResourceUpdate data) _data.put(resourceName, data); } + @VisibleForTesting + SubscriberFetchState getFetchState() + { + return _fetchState.get(); + } + void addWatcher(WildcardResourceWatcher watcher) { _watchers.add(watcher); @@ -1147,17 +1190,23 @@ void addWatcher(WildcardResourceWatcher watcher) } @VisibleForTesting - void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider, boolean isIrvEnabled) + void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider) { if (Objects.equals(_data.get(resourceName), data)) { + if (!FETCHED.equals(_fetchState.get()) && data != null && data.isValid()) + { + // Even though the data is the same, the subscriber is waiting for init data after either startup + // or a reconnection, so we need to track latency. + trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get()); + } _log.debug("Received resource update data equal to the current data. Will not perform the update."); return; } // null value guard to avoid overwriting the property with null if (data != null && data.isValid()) { - trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt, isIrvEnabled, _fetchState); + trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get()); _data.put(resourceName, data); } else @@ -1226,7 +1275,11 @@ void onRemoval(String resourceName) @VisibleForTesting void onAllResourcesProcessed() { - _fetchState = SubscriberFetchState.FETCHED; + SubscriberFetchState prev = _fetchState.getAndSet(FETCHED); + if (!FETCHED.equals(prev)) + { + _log.info("Received initial data for all resources with wildcard subscriber of type {}", _type); + } for (WildcardResourceWatcher watcher : _watchers) { @@ -1236,23 +1289,34 @@ void onAllResourcesProcessed() private void reset() { - _subscribedAt = _clock.currentTimeMillis(); - _fetchState = SubscriberFetchState.PENDING_AFTER_RECONNECT; + // only change state to PENDING_AFTER_RECONNECT if the previous state was FETCHED. + // if the previous state was still init pending, no change to the state and subscribed time + if (_fetchState.compareAndSet(FETCHED, SubscriberFetchState.PENDING_AFTER_RECONNECT)) + { + _subscribedAt.set(_clock.currentTimeMillis()); + _log.info("Reset fetch state to PENDING_AFTER_RECONNECT for {} wildcard subscriber after reconnect.", _type); + } } @VisibleForTesting void setSubscribedAt(long subscribedAt) { - _subscribedAt = subscribedAt; + _subscribedAt.set(subscribedAt); } long getActiveInitialWaitTimeMillis(long end) { - if (SubscriberFetchState.FETCHED.equals(_fetchState)) + SubscriberFetchState cur = _fetchState.get(); + // if either 1) already fetched data, or 2) IRV is enabled and no data is received after reconnect, which means + // the server has no new data to send, then no active wait time. + if (FETCHED.equals(cur) || (_isIrvEnabled && PENDING_AFTER_RECONNECT.equals(cur))) { return 0; } - return end - _subscribedAt; + long past = end - _subscribedAt.get(); + // the wait time metric is collected every minute, so it's rate limited already + _log.info("{} wildcard subscriber Waiting for initial data, past ms: {}", _type, past); + return past; } } diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index abaa61159a..57c27f34aa 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -46,7 +46,9 @@ import org.testng.annotations.Test; import static com.linkedin.d2.xds.XdsClient.ResourceType.*; +import static com.linkedin.d2.xds.XdsClientImpl.SubscriberFetchState.*; import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; @@ -334,9 +336,9 @@ public void testHandleD2NodeUpdateWithEmptyResponse() { fixture.watchAllResourceAndWatcherTypes(); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_WITH_EMPTY_NODE_RESPONSE); fixture.verifyAckSent(1); - verify(fixture._d2UriSubscriber, times(0)).onData(any(), any(), anyBoolean()); - verify(fixture._clusterSubscriber, times(0)).onData(any(), any(), anyBoolean()); - verify(fixture._uriMapWildcardSubscriber, times(0)).onData(any(), any(), any(), anyBoolean()); + verify(fixture._d2UriSubscriber, times(0)).onData(any(), any()); + verify(fixture._clusterSubscriber, times(0)).onData(any(), any()); + verify(fixture._uriMapWildcardSubscriber, times(0)).onData(any(), any(), any()); } @DataProvider(name = "badNodeUpdateTestCases") @@ -438,7 +440,7 @@ public void testHandleD2ClusterOrServiceNameEmptyResponse() { fixture.watchAllResourceAndWatcherTypes(); fixture._xdsClientImpl.handleResponse(RESPONSE_WITH_EMPTY_NAMES); fixture.verifyAckSent(1); - verify(fixture._nameWildcardSubscriber, times(0)).onData(any(), any(), any(), anyBoolean()); + verify(fixture._nameWildcardSubscriber, times(0)).onData(any(), any(), any()); } @Test @@ -530,11 +532,17 @@ public void testHandleD2URIMapResponseWithData(boolean toWatchIndividual, boolea public void testHandleD2URIMapUpdateWithEmptyResponse() { XdsClientImplFixture fixture = new XdsClientImplFixture(); fixture.watchAllResourceAndWatcherTypes(); + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI1_DATA_WITH_REMOVAL_NON_LAST_NONCE); + assertEquals(D2_URI_MAP.emptyData(), fixture._clusterSubscriber.getData()); + assertEquals(D2_URI_MAP.emptyData(), fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME)); + verify(fixture._uriMapWildcardSubscriber, never()).onAllResourcesProcessed(); + // Sanity check that the code handles empty responses fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_WITH_EMPTY_URI_MAP_RESPONSE); - fixture.verifyAckSent(1); - verify(fixture._clusterSubscriber, times(0)).onData(any(), any(), anyBoolean()); - verify(fixture._uriMapWildcardSubscriber, times(0)).onData(any(), any(), any(), anyBoolean()); + fixture.verifyAckSent(2); + // onData is called only once. Empty response does not trigger onData calls. + verify(fixture._clusterSubscriber).onData(any(), any()); + verify(fixture._uriMapWildcardSubscriber).onData(any(), any(), any()); } @Test(dataProvider = "providerWatcherFlags") @@ -722,7 +730,7 @@ public void testHandleD2URICollectionResponseWithRemoval() { @Test public void testResourceSubscriberAddWatcher() { - ResourceSubscriber subscriber = new ResourceSubscriber(NODE, "foo", null); + ResourceSubscriber subscriber = new ResourceSubscriber(NODE, "foo", null, false); XdsClient.ResourceWatcher watcher = Mockito.mock(XdsClient.ResourceWatcher.class); subscriber.addWatcher(watcher); verify(watcher, times(0)).onChanged(any()); @@ -734,7 +742,7 @@ public void testResourceSubscriberAddWatcher() { } verify(watcher, times(10)).onChanged(update); - WildcardResourceSubscriber wildcardSubscriber = new WildcardResourceSubscriber(D2_CLUSTER_OR_SERVICE_NAME); + WildcardResourceSubscriber wildcardSubscriber = new WildcardResourceSubscriber(D2_CLUSTER_OR_SERVICE_NAME, false); XdsClient.WildcardResourceWatcher _wildcardWatcher = Mockito.mock(XdsClient.WildcardResourceWatcher.class); wildcardSubscriber.addWatcher(_wildcardWatcher); verify(_wildcardWatcher, times(0)).onChanged(any(), any()); @@ -944,7 +952,7 @@ public void testTrackLatencyForD2URI(boolean isIrvEnabled) { fixture.watchAllResourceAndWatcherTypes(); // first fetch, subscribe time is after modified time, so the latency is calculated with subscribe time - fixture._xdsClientImpl.handleResponse(RESPONSE_D2URI1); + fixture._xdsClientImpl.handleResponse(RESPONSE_D2URI1_WITH_NON_LAST_NONCE); verifyPositiveLagLessThanOneSec(3, metrics); fixture._uriMapWildcardSubscriber.onAllResourcesProcessed(); // end of first fetch @@ -986,53 +994,98 @@ public void testTrackLatencyForD2URI(boolean isIrvEnabled) { private enum ResponseAction { UPDATE, REMOVAL } - - @DataProvider(name = "provideResponseAction") - public Object[][] provideResponseAction() { + @DataProvider(name = "provideResponseActionAndIrvEnabled") + public Object[][] provideResponseActionAndIrvEnabled() + { // Params: // responseAction --- whether the response is a data update or a removal - return new Object[][]{{ResponseAction.UPDATE}, {ResponseAction.REMOVAL}}; + // irvEnabled --- whether IRV is enabled + return new Object[][] + { + {ResponseAction.UPDATE, false}, + {ResponseAction.REMOVAL, false}, + {ResponseAction.UPDATE, true}, + {ResponseAction.REMOVAL, true} + }; } - - @Test(dataProvider = "provideResponseAction") - private void testGetActiveInitialWaitTimeMillis(ResponseAction responseAction) { - XdsClientImplFixture fixture = new XdsClientImplFixture(); + @Test(dataProvider = "provideResponseActionAndIrvEnabled") + private void testGetActiveInitialWaitTime(ResponseAction responseAction, Boolean irvEnabled) + { + XdsClientImplFixture fixture = new XdsClientImplFixture(true, irvEnabled); + XdsClientImpl xdsClient = fixture._xdsClientImpl; fixture.watchAllResourceAndWatcherTypes(); fixture.setAllSubscribersSubscribedAt(System.currentTimeMillis() - TEN_MINS); // active initial wait time contributed from 8 subscribers for: - // individual node, uri, uri map, and wildcard node, uri map, d2 cluster/service name, callees, and callees wildcard subscribers - long activeWaitTime = fixture._xdsClientImpl.getActiveInitialWaitTimeMillis(); - Assert.assertTrue(activeWaitTime >= Time.minutes(80) && activeWaitTime < Time.minutes(90)); + // individual node, uri, uri map, callees, and wildcard node, uri map, d2 cluster/service name and callees subscribers + fixture.verifySubscribersState(INIT_PENDING); + verifyWaitTimeFromSubscribers(xdsClient.getActiveInitialWaitTimeMillis(), 8); if (responseAction == ResponseAction.UPDATE) { - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); - fixture._xdsClientImpl.handleResponse(RESPONSE_D2URI1_WITH_NON_LAST_NONCE); - fixture._xdsClientImpl.handleResponse(RESPONSE_WITH_SERVICE_NAMES); - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_CALLEES_DATA1); + xdsClient.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); + xdsClient.handleResponse(RESPONSE_D2URI1); + xdsClient.handleResponse(RESPONSE_WITH_SERVICE_NAMES); + xdsClient.handleResponse(DISCOVERY_RESPONSE_CALLEES_DATA1); } else if (responseAction == ResponseAction.REMOVAL) { - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL); - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI1_DATA_WITH_REMOVAL_NON_LAST_NONCE); - fixture._xdsClientImpl.handleResponse(RESPONSE_WITH_NAME_REMOVAL); - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_CALLEES_DATA_WITH_REMOVAL); + xdsClient.handleResponse(DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL); + xdsClient.handleResponse(DISCOVERY_RESPONSE_URI1_DATA_WITH_REMOVAL); + xdsClient.handleResponse(RESPONSE_WITH_NAME_REMOVAL); + xdsClient.handleResponse(DISCOVERY_RESPONSE_CALLEES_DATA_WITH_REMOVAL); } - // All individual resource subscribers have received first response, so they won't contribute to active initial + // All subscribers have received first response, so they won't contribute to active initial // wait time anymore. - // End the first fetch for node and name subscriber, so it won't contribute to active initial wait time anymore. - - activeWaitTime = fixture._xdsClientImpl.getActiveInitialWaitTimeMillis(); - // only wildcard uri map subscriber is still contributing to active initial wait time - Assert.assertTrue(activeWaitTime >= Time.minutes(10) && activeWaitTime < Time.minutes(20)); + fixture.verifySubscribersState(FETCHED); + assertEquals( 0L, xdsClient.getActiveInitialWaitTimeMillis()); + + // After reconnect all subscribers go to PENDING_AFTER_RECONNECT state. + fixture.reconnect(); + fixture.verifySubscribersState(PENDING_AFTER_RECONNECT); + if (irvEnabled) + { + // If IRV is enabled, not receiving any data means the existing data is up-to-date, so wait time is 0. + assertEquals( 0L, xdsClient.getActiveInitialWaitTimeMillis()); + // subsequent data received will just be updates on the existing data, not part of the initial wait time + } + else { + // If IRV is not enabled, all subscribers are contributing to wait time + fixture.setAllSubscribersSubscribedAt(System.currentTimeMillis() - TEN_MINS); + verifyWaitTimeFromSubscribers(xdsClient.getActiveInitialWaitTimeMillis(), 8); + + // 2 subscribers receive data again. 6 subscribers are still waiting. + xdsClient.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); + assertEquals(FETCHED, fixture._nodeSubscriber.getFetchState()); + assertEquals(FETCHED, fixture._nodeWildcardSubscriber.getFetchState()); + verifyWaitTimeFromSubscribers(xdsClient.getActiveInitialWaitTimeMillis(), 6); + } + } - // when onReconnect is called on a subscriber, it will reset the flag for first fetch after reconnect, and will - // contribute to active initial wait time again. - fixture._d2UriSubscriber.onReconnect(); - fixture._nodeWildcardSubscriber.onReconnect(); + // Test the cases that no init data is received (wildcard subscribers haven't received all batches) + @Test(dataProvider = "provideIrvEnabled") + public void testGetActiveInitialWaitTimeWithPendingInitData(Boolean irvEnabled) + { + XdsClientImplFixture fixture = new XdsClientImplFixture(false, irvEnabled); + XdsClientImpl xdsClient = fixture._xdsClientImpl; + fixture.watchAllResourceAndWatcherTypes(); fixture.setAllSubscribersSubscribedAt(System.currentTimeMillis() - TEN_MINS); + // init pending for all 6 subscribers as verified in the above test + + fixture.reconnect(); + // reconnection while init data is still pending, subscribers state and subscribedAt time won't change + fixture.verifySubscribersState(INIT_PENDING); + verifyWaitTimeFromSubscribers(xdsClient.getActiveInitialWaitTimeMillis(), 8); + + // 3 subscribers receive data. 5 subscribers are still waiting. + xdsClient.handleResponse(RESPONSE_D2URI1); + assertEquals(FETCHED, fixture._clusterSubscriber.getFetchState()); + assertEquals(FETCHED, fixture._uriMapWildcardSubscriber.getFetchState()); + verifyWaitTimeFromSubscribers(xdsClient.getActiveInitialWaitTimeMillis(), 5); + } - // 3 subscribers are contributing to active initial wait time now - activeWaitTime = fixture._xdsClientImpl.getActiveInitialWaitTimeMillis(); - Assert.assertTrue(activeWaitTime >= Time.minutes(30) && activeWaitTime < Time.minutes(40)); + private void verifyWaitTimeFromSubscribers(long waitTime, int numSubscribers) + { + // each subscriber contributes 10 mins wait time + Assert.assertTrue(waitTime >= numSubscribers * TEN_MINS && waitTime < (numSubscribers + 1) * TEN_MINS, + "Expected wait time from " + numSubscribers + " subscribers, actual subscribers: " + waitTime / TEN_MINS); } @Test(dataProvider = "providerWatcherFlags") @@ -1068,8 +1121,8 @@ public void testHandleD2CalleesUpdateWithEmptyResponse() { fixture.watchAllResourceAndWatcherTypes(); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_WITH_EMPTY_CALLEES_RESPONSE); fixture.verifyAckSent(1); - verify(fixture._calleesSubscriber, times(0)).onData(any(), any(), anyBoolean()); - verify(fixture._calleesWildcardSubscriber, times(0)).onData(any(), any(), any(), anyBoolean()); + verify(fixture._calleesSubscriber, times(0)).onData(any(), any()); + verify(fixture._calleesWildcardSubscriber, times(0)).onData(any(), any(), any()); } @Test @@ -1229,14 +1282,15 @@ private static final class XdsClientImplFixture { MockitoAnnotations.initMocks(this); // Make sure subscribedAt time is before current time, in other words the response handle time, in the tests when(_clock.currentTimeMillis()).thenAnswer(invocation -> System.currentTimeMillis() - 20); - _nodeSubscriber = spy(new ResourceSubscriber(NODE, SERVICE_RESOURCE_NAME, _xdsClientJmx, _clock)); - _clusterSubscriber = spy(new ResourceSubscriber(D2_URI_MAP, CLUSTER_RESOURCE_NAME, _xdsClientJmx, _clock)); - _d2UriSubscriber = spy(new ResourceSubscriber(D2_URI, URI_URN1, _xdsClientJmx, _clock)); - _calleesSubscriber = spy(new ResourceSubscriber(D2_CALLEES, CALLEES_RESOURCE_NAME, _xdsClientJmx, _clock)); - _nodeWildcardSubscriber = spy(new WildcardResourceSubscriber(NODE, _clock)); - _uriMapWildcardSubscriber = spy(new WildcardResourceSubscriber(D2_URI_MAP, _clock)); - _nameWildcardSubscriber = spy(new WildcardResourceSubscriber(D2_CLUSTER_OR_SERVICE_NAME, _clock)); - _calleesWildcardSubscriber = spy(new WildcardResourceSubscriber(D2_CALLEES, _clock)); + + _nodeSubscriber = spy(new ResourceSubscriber(NODE, SERVICE_RESOURCE_NAME, _xdsClientJmx, useIRV, _clock)); + _clusterSubscriber = spy(new ResourceSubscriber(D2_URI_MAP, CLUSTER_RESOURCE_NAME, _xdsClientJmx, useIRV, _clock)); + _d2UriSubscriber = spy(new ResourceSubscriber(D2_URI, URI_URN1, _xdsClientJmx, useIRV, _clock)); + _calleesSubscriber = spy(new ResourceSubscriber(D2_CALLEES, CALLEES_RESOURCE_NAME, _xdsClientJmx, useIRV, _clock)); + _nodeWildcardSubscriber = spy(new WildcardResourceSubscriber(NODE, useIRV, _clock)); + _uriMapWildcardSubscriber = spy(new WildcardResourceSubscriber(D2_URI_MAP, useIRV, _clock)); + _nameWildcardSubscriber = spy(new WildcardResourceSubscriber(D2_CLUSTER_OR_SERVICE_NAME, useIRV, _clock)); + _calleesWildcardSubscriber = spy(new WildcardResourceSubscriber(D2_CALLEES, useIRV, _clock)); doNothing().when(_resourceWatcher).onChanged(any()); doNothing().when(_wildcardResourceWatcher).onChanged(any(), any()); @@ -1307,7 +1361,20 @@ void setAllSubscribersSubscribedAt(long timeMillis) { } } - void watchNodeResource() { + void reconnect() + { + _nodeSubscriber.onReconnect(); + _clusterSubscriber.onReconnect(); + _d2UriSubscriber.onReconnect(); + _calleesSubscriber.onReconnect(); + _nodeWildcardSubscriber.onReconnect(); + _uriMapWildcardSubscriber.onReconnect(); + _nameWildcardSubscriber.onReconnect(); + _calleesWildcardSubscriber.onReconnect(); + } + + void watchNodeResource() + { _nodeSubscriber.addWatcher(_resourceWatcher); } @@ -1350,6 +1417,18 @@ void verifyAckOrNack(boolean nackExpected, int count) { verifyAckSent(count); } } + + void verifySubscribersState(XdsClientImpl.SubscriberFetchState expected) + { + assertEquals(expected, _nodeSubscriber.getFetchState()); + assertEquals(expected, _clusterSubscriber.getFetchState()); + assertEquals(expected, _d2UriSubscriber.getFetchState()); + assertEquals(expected, _calleesSubscriber.getFetchState()); + assertEquals(expected, _nodeWildcardSubscriber.getFetchState()); + assertEquals(expected, _nameWildcardSubscriber.getFetchState()); + assertEquals(expected, _uriMapWildcardSubscriber.getFetchState()); + assertEquals(expected, _calleesWildcardSubscriber.getFetchState()); + } } public static XdsClient.D2CalleesUpdate createCalleesUpdate(XdsD2.D2CalleeServices calleeServices) { diff --git a/gradle.properties b/gradle.properties index 5172523e6f..4310673c19 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.81.0 +version=29.81.1 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true