Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.79.0] - 2025-10-01
- Preliminary readiness management
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: what's preliminary about it? Would it make more sense to say initial implementation of readiness management system or similar?

Copy link
Contributor Author

@bohhyang bohhyang Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, please see the doc in the PR description, also explained in this class's doc. It's preliminary as a short-term solution since it's based on d2 announcements, which shouldn't be in long run with topology aware SD will change it to base on internal components' readiness.


## [29.78.0] - 2025-09-30
- Dark Cluster Partition Support

Expand Down Expand Up @@ -5906,7 +5909,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.78.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.79.0...master
[29.79.0]: https://github.com/linkedin/rest.li/compare/v29.78.0...v29.79.0
[29.78.0]: https://github.com/linkedin/rest.li/compare/v29.77.0...v29.78.0
[29.77.0]: https://github.com/linkedin/rest.li/compare/v29.76.0...v29.77.0
[29.76.0]: https://github.com/linkedin/rest.li/compare/v29.75.3...v29.76.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.linkedin.d2.balancer.servers;

public class NoOpReadinessStatusManager implements ReadinessStatusManager
{
@Override
public void registerAnnouncerStatus(AnnouncerStatus status)
{
// no-op
}

@Override
public void onAnnouncerStatusUpdated()
{
// no-op
}

@Override
public void addWatcher(ReadinessStatusWatcher watcher)
{
// no-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.linkedin.d2.balancer.servers;

import java.util.Objects;
import javax.annotation.Nonnull;


/**
* This interface manages the readiness status of the server.
* Currently, it's based on D2 announcements' statuses as a preliminary design, can be extended in future for internal
* components' readiness status.
* See the <a href="preliminary design doc">https://shorturl.at/hmjz8</a> for details of determining the readiness status based on
* D2 announcements. Treating D2 announcements as the app servers’ intent to serve traffic, the readiness status is
* determined by the combination of D2 announcements' sent status.
*/
public interface ReadinessStatusManager
{
void registerAnnouncerStatus(AnnouncerStatus status);

void onAnnouncerStatusUpdated();

void addWatcher(ReadinessStatusWatcher watcher);

interface ReadinessStatusWatcher
{
void onReadinessStatusChanged(ReadinessStatus newStatus);
}

enum ReadinessStatus
{
/**
* There is a time gap between when a serving intent changes (either static config loaded at startup, or dynamic
* markup/markdown API is called at runtime) and when the intent is fulfilled ---- (de-)announcement sent successfully.
* During this gap, the intent is not fulfilled, and the readiness status is INCONSISTENT. Or, the (de-)announcements
* failed to be sent, the readiness status is also INCONSISTENT.
* This status can be deprecated in future when the readiness status is based on internal components' readiness
* rather than D2 announcements' statuses.
*/
INCONSISTENT,
/**
* When all D2 clusters are intended to be de-announced and the de-announcements are sent successfully, or never
* announced before.
* Note: if readinessStatusManager.requireStaticD2ServersAnnounced is set to true, and some D2 clusters have
* isToBeAnnouncedFromStaticConfig set to true, then any of them is de-announced will become NOT_SERVING.
*/
NOT_SERVING,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should we mention that NOT_SERVING is/should be the default ReadinessStatus if there's no other information? It looks like in our internal implementation we use it as the default, not sure if we want to document that here

(just considering I could also imagine an argument for INCONSISTENT as the default)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INCONSISTENT is when some announcement/de-announcement happened and still ongoing or failed. At start up, no announcement is made yet, so not serving is default. I will comment it in the impl class in container. The interface here should be general.

/**
* When the D2 clusters that are intended to be announced have sent announcements successfully.
* (And if any D2 cluster is intended to be de-announced, the corresponding de-announcement is also sent successfully)
* Note: if readinessStatusManager.requireStaticD2ServersAnnounced is set to true, all D2 clusters that have
* isToBeAnnouncedFromStaticConfig set to true must be announced for the readiness status to be SERVING.
*/
SERVING
}

class AnnouncerStatus
{
// whether this announcer is to be announced and defined in static config
private final boolean isToBeAnnouncedFromStaticConfig;

private volatile AnnouncementStatus announcementStatus;

public AnnouncerStatus(boolean isToBeAnnouncedFromStaticConfig, AnnouncementStatus announcementStatus)
{
this.isToBeAnnouncedFromStaticConfig = isToBeAnnouncedFromStaticConfig;
this.announcementStatus = announcementStatus;
}

public boolean isToBeAnnouncedFromStaticConfig()
{
return isToBeAnnouncedFromStaticConfig;
}

public boolean isAnnounced()
{
return AnnouncementStatus.ANNOUNCED.equals(announcementStatus);
}

public boolean isDeAnnounced()
{
return AnnouncementStatus.DE_ANNOUNCED.equals(announcementStatus);
}

public boolean isAnnouncing()
{
return AnnouncementStatus.ANNOUNCING.equals(announcementStatus);
}

public boolean isDeAnnouncing()
{
return AnnouncementStatus.DE_ANNOUNCING.equals(announcementStatus);
}

public AnnouncementStatus getAnnouncementStatus()
{
return announcementStatus;
}

public void setAnnouncementStatus(AnnouncementStatus announcementStatus)
{
this.announcementStatus = announcementStatus;
}

public boolean isEqual(AnnouncerStatus other)
{
return other != null && this.isToBeAnnouncedFromStaticConfig == other.isToBeAnnouncedFromStaticConfig
&& Objects.equals(this.announcementStatus, other.announcementStatus);
}

public String toString()
{
return "AnnouncerStatus{isToBeAnnouncedFromStaticConfig=" + isToBeAnnouncedFromStaticConfig
+ ", announcementStatus=" + announcementStatus + "}";
}

public enum AnnouncementStatus
{
DE_ANNOUNCED, // de-announcement sent successfully
DE_ANNOUNCING, // a de-announcement is to be or is being sent
ANNOUNCING, // an announcement is to be or is being sent
ANNOUNCED // announcement sent successfully
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ScheduledExecutorService;

Expand All @@ -43,6 +44,8 @@
import com.linkedin.d2.discovery.event.LogOnlyServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter.StatusUpdateActionType;
import com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus;
import com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus.AnnouncementStatus;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore;
import com.linkedin.util.ArgumentUtil;

Expand All @@ -58,6 +61,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus.AnnouncementStatus.*;


/**
* ZooKeeperAnnouncer combines a ZooKeeperServer with a configured "desired state", and
* allows the server to be brought up/down in that state. The desired state can also
Expand All @@ -76,6 +82,7 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper, Announ
private static final Logger _log = LoggerFactory.getLogger(ZooKeeperAnnouncer.class);
private volatile String _cluster;
private volatile URI _uri;
private volatile String _announcementTargetId;
/**
* Ephemeral znode path and its data announced for the regular cluster and uri. It will be used as the tracingId in Service discovery status related tracking events.
* It's updated ONLY at mark-ups: (including regular mark-up and changing uri data by marking down then marking up again)
Expand Down Expand Up @@ -184,6 +191,9 @@ public enum ActionOnWeightBreach {
RECTIFY
}

private final AnnouncerStatus _status;
private final ReadinessStatusManager _readinessManager;

/**
* @deprecated Use the constructor {@link #ZooKeeperAnnouncer(LoadBalancerServer)} instead.
*/
Expand Down Expand Up @@ -241,6 +251,7 @@ public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, eventEmitter, null, ActionOnWeightBreach.IGNORE);
}

@Deprecated
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
Expand All @@ -250,6 +261,18 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService,
ServiceDiscoveryEventEmitter eventEmitter, BigDecimal maxWeight, ActionOnWeightBreach actionOnWeightBreach)
{
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService,
eventEmitter, maxWeight, actionOnWeightBreach, new AnnouncerStatus(
false, DE_ANNOUNCED),
new NoOpReadinessStatusManager()
);
}

public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService,
ServiceDiscoveryEventEmitter eventEmitter, BigDecimal maxWeight, ActionOnWeightBreach actionOnWeightBreach,
AnnouncerStatus status, ReadinessStatusManager readinessManager)
{
_server = server;
// initialIsUp is used for delay mark up. If it's false, there won't be markup when the announcer is started.
Expand All @@ -273,6 +296,9 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
{
((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this);
}

_status = status;
_readinessManager = readinessManager;
}

/**
Expand Down Expand Up @@ -355,6 +381,7 @@ public synchronized void markUp(final Callback<None> callback)
{
_pendingMarkUp.add(callback);
_isUp = true;
updateStatus(ANNOUNCING);
runNowOrEnqueue(() -> doMarkUp(callback));
}

Expand All @@ -368,8 +395,9 @@ public void onError(Throwable e)
emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, true, false, _markUpStartAtRef.get());
if (e instanceof KeeperException.ConnectionLossException || e instanceof KeeperException.SessionExpiredException)
{
_log.warn("failed to mark up uri = {}, cluster = {}, partitionData = {}, uriSpecificProperties = {} due to {}.",
_uri, _cluster, _partitionDataMap, _uriSpecificProperties, e.getClass().getSimpleName());
_log.warn("failed to mark up uri = {}, cluster = {}, partitionData = {}, uriSpecificProperties = {},"
+ " announcementTarget = {} due to {}.",
_uri, _cluster, _partitionDataMap, _uriSpecificProperties, _announcementTargetId, e.getClass().getSimpleName());
// Setting to null because if that connection dies, when don't want to continue making operations before
// the connection is up again.
// When the connection will be up again, the ZKAnnouncer will be restarted and it will read the _isUp
Expand All @@ -393,11 +421,13 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
updateStatus(ANNOUNCED);
_isMarkUpIntentSent.set(true);
emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, true, true, _markUpStartAtRef.get());
_markUpFailed = false;
_log.info("markUp for uri = {}, cluster = {}, partitionData = {}, uriSpecificProperties = {} succeeded.",
_uri, _cluster, _partitionDataMap, _uriSpecificProperties);
_log.info("markUp for uri = {}, cluster = {}, partitionData = {}, uriSpecificProperties = {},"
+ " announcementTarget = {} succeeded.",
_uri, _cluster, _partitionDataMap, _uriSpecificProperties, _announcementTargetId);
// Note that the pending callbacks we see at this point are
// from the requests that are filed before us because zookeeper
// guarantees the ordering of callback being invoked.
Expand Down Expand Up @@ -554,6 +584,7 @@ public synchronized void markDown(final Callback<None> callback)
{
_pendingMarkDown.add(callback);
_isUp = false;
updateStatus(DE_ANNOUNCING);
runNowOrEnqueue(() -> doMarkDown(callback));
}

Expand Down Expand Up @@ -582,6 +613,7 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
updateStatus(DE_ANNOUNCED);
_isMarkUpIntentSent.set(false);
emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, false, true, _markDownStartAtRef.get());
_log.info("markDown for uri = {} succeeded.", _uri);
Expand Down Expand Up @@ -742,6 +774,12 @@ public void setUri(String uri)
_uri = URI.create(uri);
}

public void setAnnouncementTargetId(String announcementTargetId)
{
ArgumentUtil.notNull(announcementTargetId, "announcementTargetId");
_announcementTargetId = announcementTargetId;
}

public void setUriSpecificProperties(Map<String, Object> uriSpecificProperties)
{
_uriSpecificProperties = Collections.unmodifiableMap(uriSpecificProperties);
Expand Down Expand Up @@ -864,6 +902,13 @@ public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) {
_eventEmitter = emitter;
}

public AnnouncementStatus getAnnouncementStatus()
{
return _status.getAnnouncementStatus();
}

// ##### End Properties Section #####

@Override
public void emitSDStatusActiveUpdateIntentAndWriteEvents(String cluster, boolean isMarkUp, boolean succeeded, long startAt) {
// In this class, SD event should be sent only when the announcing mode is to old service registry or dual write,
Expand Down Expand Up @@ -986,4 +1031,23 @@ private IllegalArgumentException getMaxWeightBreachException(BigDecimal weight,
+ " than the max weight allowed: %s. Please correct the weight. It will be force-capped to the max weight "
+ "in the future.", weight, partition, _maxWeight));
}

private void updateStatus(AnnouncementStatus newStatus)
{
AnnouncementStatus oldStatus = _status.getAnnouncementStatus();
if (Objects.equals(oldStatus, newStatus))
{
return;
}

_status.setAnnouncementStatus(newStatus);
_log.info("Announcement status changed from {} to {} for {}.", oldStatus, newStatus, getIdentifier());
_readinessManager.onAnnouncerStatusUpdated();
}

// unique identifier for this announcer instance with: kafka/zk addr + cluster + uri, useful for logging
private String getIdentifier()
{
return String.format("{Announcing target: %s, cluster: %s, uri: %s}", _announcementTargetId, _cluster, _uri);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,9 @@ public int getWeightDecimalPlacesBreachedCount()
public int getServerAnnounceMode() {
return _announcer.getServerAnnounceMode().ordinal();
}

@Override
public int getAnnouncementStatus() {
return _announcer.getAnnouncementStatus().ordinal();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,10 @@ void setPartitionDataUsingJson(String partitionDataJson)
* @return the server announce mode corresponding to {@link LoadBalancerServer#getAnnounceMode()}
*/
int getServerAnnounceMode();

/**
* @return the announcement status corresponding to
* {@link com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus.AnnouncementStatus}
*/
int getAnnouncementStatus();
}
Loading