Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.81.1] - 2025-11-04
- Fix xds client active wait time metric also add more logs.

## [29.81.0] - 2025-11-03
- Indis based d2 warmup for restli client

Expand Down Expand Up @@ -5930,7 +5933,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.81.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.81.1...master
[29.81.1]: https://github.com/linkedin/rest.li/compare/v29.81.0...v29.81.1
[29.81.0]: https://github.com/linkedin/rest.li/compare/v29.80.3...v29.81.0
[29.80.3]: https://github.com/linkedin/rest.li/compare/v29.80.2...v29.80.3
[29.80.2]: https://github.com/linkedin/rest.li/compare/v29.80.1...v29.80.2
Expand Down
140 changes: 102 additions & 38 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -68,6 +69,8 @@
import org.slf4j.LoggerFactory;

import static com.google.common.base.Preconditions.*;
import static com.linkedin.d2.xds.XdsClientImpl.SubscriberFetchState.*;


/**
* Implementation of {@link XdsClient} interface.
Expand Down Expand Up @@ -254,7 +257,7 @@ public void watchXdsResource(String resourceName, ResourceWatcher watcher)
ResourceSubscriber subscriber = resourceSubscriberMap.get(resourceName);
if (subscriber == null)
{
subscriber = new ResourceSubscriber(originalType, resourceName, _xdsClientJmx);
subscriber = new ResourceSubscriber(originalType, resourceName, _xdsClientJmx, _initialResourceVersionsEnabled);
resourceSubscriberMap.put(resourceName, subscriber);
ResourceType adjustedType;
String adjustedResourceName;
Expand Down Expand Up @@ -292,7 +295,7 @@ public void watchAllXdsResources(WildcardResourceWatcher watcher)
WildcardResourceSubscriber subscriber = getWildcardResourceSubscriber(originalType);
if (subscriber == null)
{
subscriber = new WildcardResourceSubscriber(originalType);
subscriber = new WildcardResourceSubscriber(originalType, _initialResourceVersionsEnabled);
getWildcardResourceSubscribers().put(originalType, subscriber);

ResourceType adjustedType = shouldSubscribeUriGlobCollection(originalType) ? ResourceType.D2_URI : originalType;
Expand Down Expand Up @@ -675,7 +678,7 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)
|| uriSubscriber.getData() == null // The URI was corrupted and there was no previous version of this URI
)
{
uriSubscriber.onData(new D2URIUpdate(uri), _serverMetricsProvider, _initialResourceVersionsEnabled);
uriSubscriber.onData(new D2URIUpdate(uri), _serverMetricsProvider);
}
}

Expand Down Expand Up @@ -791,12 +794,12 @@ private void handleResourceUpdate(Map<String, ? extends ResourceUpdate> updates,
ResourceSubscriber subscriber = subscribers.get(entry.getKey());
if (subscriber != null)
{
subscriber.onData(entry.getValue(), _serverMetricsProvider, _initialResourceVersionsEnabled);
subscriber.onData(entry.getValue(), _serverMetricsProvider);
}

if (wildcardSubscriber != null)
{
wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider, _initialResourceVersionsEnabled);
wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider);
}
}
}
Expand Down Expand Up @@ -926,7 +929,7 @@ Map<ResourceType, WildcardResourceSubscriber> getWildcardResourceSubscribers()
return _wildcardSubscribers;
}

private enum SubscriberFetchState
enum SubscriberFetchState
{
INIT_PENDING, // the very first fetch since the subscriber is created
PENDING_AFTER_RECONNECT, // the first fetch after each reconnect
Expand All @@ -942,22 +945,24 @@ static class ResourceSubscriber
@Nullable
private ResourceUpdate _data;
private final Clock _clock;
private long _subscribedAt;
private SubscriberFetchState _fetchState = SubscriberFetchState.INIT_PENDING;
private AtomicLong _subscribedAt = new AtomicLong(0L);
private AtomicReference<SubscriberFetchState> _fetchState = new AtomicReference<>(SubscriberFetchState.INIT_PENDING);
private final boolean _isIrvEnabled;

ResourceSubscriber(ResourceType type, String resource, XdsClientJmx xdsClientJmx)
ResourceSubscriber(ResourceType type, String resource, XdsClientJmx xdsClientJmx, boolean isIrvEnabled)
{
this(type, resource, xdsClientJmx, SystemClock.instance());
this(type, resource, xdsClientJmx, isIrvEnabled, SystemClock.instance());
}

@VisibleForTesting
ResourceSubscriber(ResourceType type, String resource, XdsClientJmx xdsClientJmx, Clock clock)
ResourceSubscriber(ResourceType type, String resource, XdsClientJmx xdsClientJmx, boolean isIrvEnabled, Clock clock)
{
_type = type;
_resource = resource;
_xdsClientJmx = xdsClientJmx;
_clock = clock;
_subscribedAt = _clock.currentTimeMillis();
_isIrvEnabled = isIrvEnabled;
_subscribedAt.set(_clock.currentTimeMillis());
}

@VisibleForTesting
Expand All @@ -973,6 +978,12 @@ public void setData(@Nullable ResourceUpdate data)
_data = data;
}

@VisibleForTesting
SubscriberFetchState getFetchState()
{
return _fetchState.get();
}

void addWatcher(ResourceWatcher watcher)
{
_watchers.add(watcher);
Expand All @@ -984,21 +995,30 @@ void addWatcher(ResourceWatcher watcher)
}

@VisibleForTesting
void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider, boolean isIrvEnabled)
void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider)
{
SubscriberFetchState prev = _fetchState.getAndSet(FETCHED);
if (!FETCHED.equals(prev))
{
_log.info("Received initial data for {} {}. Set state to FETCHED.", _type, _resource);
}

if (Objects.equals(_data, data))
{
_log.debug("Received resource update data equal to the current data. Will not perform the update.");
if (!FETCHED.equals(prev) && data != null && data.isValid())
{
// Even though the data is the same, the subscriber is waiting for init data after either startup
// or a reconnection, so we need to track latency.
trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev);
}
_log.debug("Received resource update data equal to the current data. Will not perform any update.");
return;
}

SubscriberFetchState curFetchState = _fetchState;
_fetchState = SubscriberFetchState.FETCHED;

// null value guard to avoid overwriting the property with null
if (data != null && data.isValid())
{
trackServerLatency(data, _data, metricsProvider, _subscribedAt, isIrvEnabled, curFetchState); // data updated, track xds server latency
trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev);
_data = data;
}
else
Expand Down Expand Up @@ -1067,7 +1087,11 @@ void onReconnect()
@VisibleForTesting
void onRemoval()
{
_fetchState = SubscriberFetchState.FETCHED;
SubscriberFetchState prev = _fetchState.getAndSet(FETCHED);
if (!FETCHED.equals(prev))
{
_log.info("Received data removed for {} {}. Set state to FETCHED.", _type, _resource);
}

if (_data == null)
{
Expand All @@ -1082,23 +1106,34 @@ void onRemoval()

private void reset()
{
_subscribedAt = _clock.currentTimeMillis();
_fetchState = SubscriberFetchState.PENDING_AFTER_RECONNECT;
// only change state to PENDING_AFTER_RECONNECT if the previous state was FETCHED.
// if the previous state was still init pending, no change to the state and subscribed time
if (_fetchState.compareAndSet(FETCHED, SubscriberFetchState.PENDING_AFTER_RECONNECT))
{
_subscribedAt.set(_clock.currentTimeMillis());
_log.info("Reset fetch state to PENDING_AFTER_RECONNECT for {} {} after reconnect.", _type, _resource);
}
}

@VisibleForTesting
void setSubscribedAt(long subscribedAt)
{
_subscribedAt = subscribedAt;
_subscribedAt.set(subscribedAt);
}

long getActiveInitialWaitTimeMillis(long end)
{
if (SubscriberFetchState.FETCHED.equals(_fetchState))
SubscriberFetchState cur = _fetchState.get();
// if either 1) already fetched data, or 2) IRV is enabled and no data is received after reconnect, which means
// the server has no new data to send, then no active wait time.
if (FETCHED.equals(cur) || (_isIrvEnabled && PENDING_AFTER_RECONNECT.equals(cur)))
{
return 0;
}
return end - _subscribedAt;
long past = end - _subscribedAt.get();
// the wait time metric is collected every minute, so it's rate limited already
_log.info("Waiting for initial data of {} {}, past ms: {}.", _type, _resource, past);
return past;
}
}

Expand All @@ -1108,19 +1143,21 @@ static class WildcardResourceSubscriber
private final Set<WildcardResourceWatcher> _watchers = new HashSet<>();
private final Map<String, ResourceUpdate> _data = new HashMap<>();
private final Clock _clock;
private long _subscribedAt;
private SubscriberFetchState _fetchState = SubscriberFetchState.INIT_PENDING;
private AtomicLong _subscribedAt = new AtomicLong(0L);
private AtomicReference<SubscriberFetchState> _fetchState = new AtomicReference<>(SubscriberFetchState.INIT_PENDING);
private final boolean _isIrvEnabled;

WildcardResourceSubscriber(ResourceType type)
WildcardResourceSubscriber(ResourceType type, boolean isIrvEnabled)
{
this(type, SystemClock.instance());
this(type, isIrvEnabled, SystemClock.instance());
}

WildcardResourceSubscriber(ResourceType type, Clock clock)
WildcardResourceSubscriber(ResourceType type, boolean isIrvEnabled, Clock clock)
{
_type = type;
_clock = clock;
_subscribedAt = _clock.currentTimeMillis();
_isIrvEnabled = isIrvEnabled;
_subscribedAt.set(_clock.currentTimeMillis());
}

@VisibleForTesting
Expand All @@ -1135,6 +1172,12 @@ public void setData(String resourceName, ResourceUpdate data)
_data.put(resourceName, data);
}

@VisibleForTesting
SubscriberFetchState getFetchState()
{
return _fetchState.get();
}

void addWatcher(WildcardResourceWatcher watcher)
{
_watchers.add(watcher);
Expand All @@ -1147,17 +1190,23 @@ void addWatcher(WildcardResourceWatcher watcher)
}

@VisibleForTesting
void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider, boolean isIrvEnabled)
void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider)
{
if (Objects.equals(_data.get(resourceName), data))
{
if (!FETCHED.equals(_fetchState.get()) && data != null && data.isValid())
{
// Even though the data is the same, the subscriber is waiting for init data after either startup
// or a reconnection, so we need to track latency.
trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get());
}
_log.debug("Received resource update data equal to the current data. Will not perform the update.");
return;
}
// null value guard to avoid overwriting the property with null
if (data != null && data.isValid())
{
trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt, isIrvEnabled, _fetchState);
trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get());
_data.put(resourceName, data);
}
else
Expand Down Expand Up @@ -1226,7 +1275,11 @@ void onRemoval(String resourceName)
@VisibleForTesting
void onAllResourcesProcessed()
{
_fetchState = SubscriberFetchState.FETCHED;
SubscriberFetchState prev = _fetchState.getAndSet(FETCHED);
if (!FETCHED.equals(prev))
{
_log.info("Received initial data for all resources with wildcard subscriber of type {}", _type);
}

for (WildcardResourceWatcher watcher : _watchers)
{
Expand All @@ -1236,23 +1289,34 @@ void onAllResourcesProcessed()

private void reset()
{
_subscribedAt = _clock.currentTimeMillis();
_fetchState = SubscriberFetchState.PENDING_AFTER_RECONNECT;
// only change state to PENDING_AFTER_RECONNECT if the previous state was FETCHED.
// if the previous state was still init pending, no change to the state and subscribed time
if (_fetchState.compareAndSet(FETCHED, SubscriberFetchState.PENDING_AFTER_RECONNECT))
{
_subscribedAt.set(_clock.currentTimeMillis());
_log.info("Reset fetch state to PENDING_AFTER_RECONNECT for {} wildcard subscriber after reconnect.", _type);
}
}

@VisibleForTesting
void setSubscribedAt(long subscribedAt)
{
_subscribedAt = subscribedAt;
_subscribedAt.set(subscribedAt);
}

long getActiveInitialWaitTimeMillis(long end)
{
if (SubscriberFetchState.FETCHED.equals(_fetchState))
SubscriberFetchState cur = _fetchState.get();
// if either 1) already fetched data, or 2) IRV is enabled and no data is received after reconnect, which means
// the server has no new data to send, then no active wait time.
if (FETCHED.equals(cur) || (_isIrvEnabled && PENDING_AFTER_RECONNECT.equals(cur)))
{
return 0;
}
return end - _subscribedAt;
long past = end - _subscribedAt.get();
// the wait time metric is collected every minute, so it's rate limited already
_log.info("{} wildcard subscriber Waiting for initial data, past ms: {}", _type, past);
return past;
}
}

Expand Down
Loading