diff --git a/CHANGELOG.md b/CHANGELOG.md
index b38c8ebdf8..29258cdbac 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -14,6 +14,9 @@ and what APIs have changed, if applicable.
## [Unreleased]
+## [29.79.0] - 2025-10-01
+- Preliminary readiness management
+
## [29.78.0] - 2025-09-30
- Dark Cluster Partition Support
@@ -5906,7 +5909,8 @@ patch operations can re-use these classes for generating patch messages.
## [0.14.1]
-[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.78.0...master
+[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.79.0...master
+[29.79.0]: https://github.com/linkedin/rest.li/compare/v29.78.0...v29.79.0
[29.78.0]: https://github.com/linkedin/rest.li/compare/v29.77.0...v29.78.0
[29.77.0]: https://github.com/linkedin/rest.li/compare/v29.76.0...v29.77.0
[29.76.0]: https://github.com/linkedin/rest.li/compare/v29.75.3...v29.76.0
diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/NoOpReadinessStatusManager.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/NoOpReadinessStatusManager.java
new file mode 100644
index 0000000000..2d4a37dee7
--- /dev/null
+++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/NoOpReadinessStatusManager.java
@@ -0,0 +1,22 @@
+package com.linkedin.d2.balancer.servers;
+
+public class NoOpReadinessStatusManager implements ReadinessStatusManager
+{
+ @Override
+ public void registerAnnouncerStatus(AnnouncerStatus status)
+ {
+ // no-op
+ }
+
+ @Override
+ public void onAnnouncerStatusUpdated()
+ {
+ // no-op
+ }
+
+ @Override
+ public void addWatcher(ReadinessStatusWatcher watcher)
+ {
+ // no-op
+ }
+}
diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ReadinessStatusManager.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ReadinessStatusManager.java
new file mode 100644
index 0000000000..53046b0ab7
--- /dev/null
+++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ReadinessStatusManager.java
@@ -0,0 +1,123 @@
+package com.linkedin.d2.balancer.servers;
+
+import java.util.Objects;
+import javax.annotation.Nonnull;
+
+
+/**
+ * This interface manages the readiness status of the server.
+ * Currently, it's based on D2 announcements' statuses as a preliminary design, can be extended in future for internal
+ * components' readiness status.
+ * See the https://shorturl.at/hmjz8 for details of determining the readiness status based on
+ * D2 announcements. Treating D2 announcements as the app servers’ intent to serve traffic, the readiness status is
+ * determined by the combination of D2 announcements' sent status.
+ */
+public interface ReadinessStatusManager
+{
+ void registerAnnouncerStatus(AnnouncerStatus status);
+
+ void onAnnouncerStatusUpdated();
+
+ void addWatcher(ReadinessStatusWatcher watcher);
+
+ interface ReadinessStatusWatcher
+ {
+ void onReadinessStatusChanged(ReadinessStatus newStatus);
+ }
+
+ enum ReadinessStatus
+ {
+ /**
+ * There is a time gap between when a serving intent changes (either static config loaded at startup, or dynamic
+ * markup/markdown API is called at runtime) and when the intent is fulfilled ---- (de-)announcement sent successfully.
+ * During this gap, the intent is not fulfilled, and the readiness status is INCONSISTENT. Or, the (de-)announcements
+ * failed to be sent, the readiness status is also INCONSISTENT.
+ * This status can be deprecated in future when the readiness status is based on internal components' readiness
+ * rather than D2 announcements' statuses.
+ */
+ INCONSISTENT,
+ /**
+ * When all D2 clusters are intended to be de-announced and the de-announcements are sent successfully, or never
+ * announced before.
+ * Note: if readinessStatusManager.requireStaticD2ServersAnnounced is set to true, and some D2 clusters have
+ * isToBeAnnouncedFromStaticConfig set to true, then any of them is de-announced will become NOT_SERVING.
+ */
+ NOT_SERVING,
+ /**
+ * When the D2 clusters that are intended to be announced have sent announcements successfully.
+ * (And if any D2 cluster is intended to be de-announced, the corresponding de-announcement is also sent successfully)
+ * Note: if readinessStatusManager.requireStaticD2ServersAnnounced is set to true, all D2 clusters that have
+ * isToBeAnnouncedFromStaticConfig set to true must be announced for the readiness status to be SERVING.
+ */
+ SERVING
+ }
+
+ class AnnouncerStatus
+ {
+ // whether this announcer is to be announced and defined in static config
+ private final boolean isToBeAnnouncedFromStaticConfig;
+
+ private volatile AnnouncementStatus announcementStatus;
+
+ public AnnouncerStatus(boolean isToBeAnnouncedFromStaticConfig, AnnouncementStatus announcementStatus)
+ {
+ this.isToBeAnnouncedFromStaticConfig = isToBeAnnouncedFromStaticConfig;
+ this.announcementStatus = announcementStatus;
+ }
+
+ public boolean isToBeAnnouncedFromStaticConfig()
+ {
+ return isToBeAnnouncedFromStaticConfig;
+ }
+
+ public boolean isAnnounced()
+ {
+ return AnnouncementStatus.ANNOUNCED.equals(announcementStatus);
+ }
+
+ public boolean isDeAnnounced()
+ {
+ return AnnouncementStatus.DE_ANNOUNCED.equals(announcementStatus);
+ }
+
+ public boolean isAnnouncing()
+ {
+ return AnnouncementStatus.ANNOUNCING.equals(announcementStatus);
+ }
+
+ public boolean isDeAnnouncing()
+ {
+ return AnnouncementStatus.DE_ANNOUNCING.equals(announcementStatus);
+ }
+
+ public AnnouncementStatus getAnnouncementStatus()
+ {
+ return announcementStatus;
+ }
+
+ public void setAnnouncementStatus(AnnouncementStatus announcementStatus)
+ {
+ this.announcementStatus = announcementStatus;
+ }
+
+ public boolean isEqual(AnnouncerStatus other)
+ {
+ return other != null && this.isToBeAnnouncedFromStaticConfig == other.isToBeAnnouncedFromStaticConfig
+ && Objects.equals(this.announcementStatus, other.announcementStatus);
+ }
+
+ public String toString()
+ {
+ return "AnnouncerStatus{isToBeAnnouncedFromStaticConfig=" + isToBeAnnouncedFromStaticConfig
+ + ", announcementStatus=" + announcementStatus + "}";
+ }
+
+ public enum AnnouncementStatus
+ {
+ DE_ANNOUNCED, // de-announcement sent successfully
+ DE_ANNOUNCING, // a de-announcement is to be or is being sent
+ ANNOUNCING, // an announcement is to be or is being sent
+ ANNOUNCED // announcement sent successfully
+ }
+ }
+}
diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java
index 48d7059373..f954c5e2cf 100644
--- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java
+++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java
@@ -29,6 +29,7 @@
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ScheduledExecutorService;
@@ -43,6 +44,8 @@
import com.linkedin.d2.discovery.event.LogOnlyServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter.StatusUpdateActionType;
+import com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus;
+import com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus.AnnouncementStatus;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore;
import com.linkedin.util.ArgumentUtil;
@@ -58,6 +61,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus.AnnouncementStatus.*;
+
+
/**
* ZooKeeperAnnouncer combines a ZooKeeperServer with a configured "desired state", and
* allows the server to be brought up/down in that state. The desired state can also
@@ -76,6 +82,7 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper, Announ
private static final Logger _log = LoggerFactory.getLogger(ZooKeeperAnnouncer.class);
private volatile String _cluster;
private volatile URI _uri;
+ private volatile String _announcementTargetId;
/**
* Ephemeral znode path and its data announced for the regular cluster and uri. It will be used as the tracingId in Service discovery status related tracking events.
* It's updated ONLY at mark-ups: (including regular mark-up and changing uri data by marking down then marking up again)
@@ -184,6 +191,9 @@ public enum ActionOnWeightBreach {
RECTIFY
}
+ private final AnnouncerStatus _status;
+ private final ReadinessStatusManager _readinessManager;
+
/**
* @deprecated Use the constructor {@link #ZooKeeperAnnouncer(LoadBalancerServer)} instead.
*/
@@ -241,6 +251,7 @@ public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, eventEmitter, null, ActionOnWeightBreach.IGNORE);
}
+ @Deprecated
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
@@ -250,6 +261,18 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService,
ServiceDiscoveryEventEmitter eventEmitter, BigDecimal maxWeight, ActionOnWeightBreach actionOnWeightBreach)
+ {
+ this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService,
+ eventEmitter, maxWeight, actionOnWeightBreach, new AnnouncerStatus(
+ false, DE_ANNOUNCED),
+ new NoOpReadinessStatusManager()
+ );
+ }
+
+ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
+ boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService,
+ ServiceDiscoveryEventEmitter eventEmitter, BigDecimal maxWeight, ActionOnWeightBreach actionOnWeightBreach,
+ AnnouncerStatus status, ReadinessStatusManager readinessManager)
{
_server = server;
// initialIsUp is used for delay mark up. If it's false, there won't be markup when the announcer is started.
@@ -273,6 +296,9 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
{
((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this);
}
+
+ _status = status;
+ _readinessManager = readinessManager;
}
/**
@@ -355,6 +381,7 @@ public synchronized void markUp(final Callback callback)
{
_pendingMarkUp.add(callback);
_isUp = true;
+ updateStatus(ANNOUNCING);
runNowOrEnqueue(() -> doMarkUp(callback));
}
@@ -368,8 +395,9 @@ public void onError(Throwable e)
emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, true, false, _markUpStartAtRef.get());
if (e instanceof KeeperException.ConnectionLossException || e instanceof KeeperException.SessionExpiredException)
{
- _log.warn("failed to mark up uri = {}, cluster = {}, partitionData = {}, uriSpecificProperties = {} due to {}.",
- _uri, _cluster, _partitionDataMap, _uriSpecificProperties, e.getClass().getSimpleName());
+ _log.warn("failed to mark up uri = {}, cluster = {}, partitionData = {}, uriSpecificProperties = {},"
+ + " announcementTarget = {} due to {}.",
+ _uri, _cluster, _partitionDataMap, _uriSpecificProperties, _announcementTargetId, e.getClass().getSimpleName());
// Setting to null because if that connection dies, when don't want to continue making operations before
// the connection is up again.
// When the connection will be up again, the ZKAnnouncer will be restarted and it will read the _isUp
@@ -393,11 +421,13 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
+ updateStatus(ANNOUNCED);
_isMarkUpIntentSent.set(true);
emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, true, true, _markUpStartAtRef.get());
_markUpFailed = false;
- _log.info("markUp for uri = {}, cluster = {}, partitionData = {}, uriSpecificProperties = {} succeeded.",
- _uri, _cluster, _partitionDataMap, _uriSpecificProperties);
+ _log.info("markUp for uri = {}, cluster = {}, partitionData = {}, uriSpecificProperties = {},"
+ + " announcementTarget = {} succeeded.",
+ _uri, _cluster, _partitionDataMap, _uriSpecificProperties, _announcementTargetId);
// Note that the pending callbacks we see at this point are
// from the requests that are filed before us because zookeeper
// guarantees the ordering of callback being invoked.
@@ -554,6 +584,7 @@ public synchronized void markDown(final Callback callback)
{
_pendingMarkDown.add(callback);
_isUp = false;
+ updateStatus(DE_ANNOUNCING);
runNowOrEnqueue(() -> doMarkDown(callback));
}
@@ -582,6 +613,7 @@ public void onError(Throwable e)
@Override
public void onSuccess(None result)
{
+ updateStatus(DE_ANNOUNCED);
_isMarkUpIntentSent.set(false);
emitSDStatusActiveUpdateIntentAndWriteEvents(_cluster, false, true, _markDownStartAtRef.get());
_log.info("markDown for uri = {} succeeded.", _uri);
@@ -742,6 +774,12 @@ public void setUri(String uri)
_uri = URI.create(uri);
}
+ public void setAnnouncementTargetId(String announcementTargetId)
+ {
+ ArgumentUtil.notNull(announcementTargetId, "announcementTargetId");
+ _announcementTargetId = announcementTargetId;
+ }
+
public void setUriSpecificProperties(Map uriSpecificProperties)
{
_uriSpecificProperties = Collections.unmodifiableMap(uriSpecificProperties);
@@ -864,6 +902,13 @@ public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) {
_eventEmitter = emitter;
}
+ public AnnouncementStatus getAnnouncementStatus()
+ {
+ return _status.getAnnouncementStatus();
+ }
+
+ // ##### End Properties Section #####
+
@Override
public void emitSDStatusActiveUpdateIntentAndWriteEvents(String cluster, boolean isMarkUp, boolean succeeded, long startAt) {
// In this class, SD event should be sent only when the announcing mode is to old service registry or dual write,
@@ -986,4 +1031,23 @@ private IllegalArgumentException getMaxWeightBreachException(BigDecimal weight,
+ " than the max weight allowed: %s. Please correct the weight. It will be force-capped to the max weight "
+ "in the future.", weight, partition, _maxWeight));
}
+
+ private void updateStatus(AnnouncementStatus newStatus)
+ {
+ AnnouncementStatus oldStatus = _status.getAnnouncementStatus();
+ if (Objects.equals(oldStatus, newStatus))
+ {
+ return;
+ }
+
+ _status.setAnnouncementStatus(newStatus);
+ _log.info("Announcement status changed from {} to {} for {}.", oldStatus, newStatus, getIdentifier());
+ _readinessManager.onAnnouncerStatusUpdated();
+ }
+
+ // unique identifier for this announcer instance with: kafka/zk addr + cluster + uri, useful for logging
+ private String getIdentifier()
+ {
+ return String.format("{Announcing target: %s, cluster: %s, uri: %s}", _announcementTargetId, _cluster, _uri);
+ }
}
diff --git a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java
index 83efb1aa02..22df74a972 100644
--- a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java
+++ b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmx.java
@@ -211,4 +211,9 @@ public int getWeightDecimalPlacesBreachedCount()
public int getServerAnnounceMode() {
return _announcer.getServerAnnounceMode().ordinal();
}
+
+ @Override
+ public int getAnnouncementStatus() {
+ return _announcer.getAnnouncementStatus().ordinal();
+ }
}
diff --git a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java
index dade7f51d5..32756cf6fd 100644
--- a/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java
+++ b/d2/src/main/java/com/linkedin/d2/jmx/ZooKeeperAnnouncerJmxMXBean.java
@@ -101,4 +101,10 @@ void setPartitionDataUsingJson(String partitionDataJson)
* @return the server announce mode corresponding to {@link LoadBalancerServer#getAnnounceMode()}
*/
int getServerAnnounceMode();
+
+ /**
+ * @return the announcement status corresponding to
+ * {@link com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus.AnnouncementStatus}
+ */
+ int getAnnouncementStatus();
}
diff --git a/d2/src/test/java/com/linkedin/d2/balancer/servers/TestZooKeeperAnnouncer.java b/d2/src/test/java/com/linkedin/d2/balancer/servers/TestZooKeeperAnnouncer.java
index 454143ad4c..34266b7333 100644
--- a/d2/src/test/java/com/linkedin/d2/balancer/servers/TestZooKeeperAnnouncer.java
+++ b/d2/src/test/java/com/linkedin/d2/balancer/servers/TestZooKeeperAnnouncer.java
@@ -2,20 +2,31 @@
import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
-import com.linkedin.d2.balancer.LoadBalancerServer;
import com.linkedin.d2.balancer.properties.PartitionData;
import com.linkedin.d2.balancer.properties.PropertyKeys;
+import com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus;
+import com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus.AnnouncementStatus;
+import com.linkedin.d2.balancer.servers.ZooKeeperAnnouncer.ActionOnWeightBreach;
import com.linkedin.d2.discovery.event.LogOnlyServiceDiscoveryEventEmitter;
+import com.linkedin.r2.util.NamedThreadFactory;
import java.math.BigDecimal;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.testng.annotations.BeforeMethod;
+import org.mockito.invocation.InvocationOnMock;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static com.linkedin.d2.balancer.servers.ReadinessStatusManager.AnnouncerStatus.AnnouncementStatus.*;
+import static java.util.concurrent.TimeUnit.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -25,13 +36,6 @@
*/
public class TestZooKeeperAnnouncer
{
- private ZooKeeperAnnouncer _announcer;
-
- @Mock
- private ZooKeeperServer _server;
- @Mock
- private Callback _callback;
-
private static final Map MAX_WEIGHT_BREACH_PARTITION_DATA =
Collections.singletonMap(0, new PartitionData(1000));
private static final Map DECIMAL_PLACES_BREACH_PARTITION_DATA =
@@ -41,24 +45,21 @@ public class TestZooKeeperAnnouncer
private static final Map VALID_PARTITION_DATA =
Collections.singletonMap(0, new PartitionData(2.3));
- @BeforeMethod
- public void setUp()
- {
- MockitoAnnotations.initMocks(this);
-
- _announcer = new ZooKeeperAnnouncer((LoadBalancerServer) _server);
- }
+ private static final Exception DUMMY_EXCEPTION = new RuntimeException("dummy error");
@Test
public void testSetDoNotLoadBalance()
{
- _announcer.setDoNotLoadBalance(_callback, true);
+ ZooKeeperAnnouncerFixture fixture = new ZooKeeperAnnouncerFixture();
+ ZooKeeperAnnouncer announcer = fixture._announcer;
+ Callback callback = fixture.getCallback();
+ announcer.setDoNotLoadBalance(callback, true);
- verify(_server).addUriSpecificProperty(any(), any(), any(), any(), eq(PropertyKeys.DO_NOT_LOAD_BALANCE), eq(true), any());
+ verify(fixture._server).addUriSpecificProperty(any(), any(), any(), any(), eq(PropertyKeys.DO_NOT_LOAD_BALANCE), eq(true), any());
- _announcer.setDoNotLoadBalance(_callback, false);
+ announcer.setDoNotLoadBalance(callback, false);
- verify(_server).addUriSpecificProperty(any(), any(), any(), any(), eq(PropertyKeys.DO_NOT_LOAD_BALANCE), eq(false), any());
+ verify(fixture._server).addUriSpecificProperty(any(), any(), any(), any(), eq(PropertyKeys.DO_NOT_LOAD_BALANCE), eq(false), any());
}
@DataProvider(name = "validatePartitionDataDataProvider")
@@ -121,9 +122,8 @@ public void testValidatePartitionData(String maxWeight, ZooKeeperAnnouncer.Actio
Map input, Map expected, Exception expectedException,
int expectedMaxWeightBreachedCount, int expectedWeightDecimalPlacesBreachedCount)
{
- ZooKeeperAnnouncer announcer = new ZooKeeperAnnouncer(_server, true, false, null, 0,
- null, new LogOnlyServiceDiscoveryEventEmitter(),
- maxWeight == null ? null : new BigDecimal(maxWeight), action);
+ ZooKeeperAnnouncer announcer = new ZooKeeperAnnouncerFixture(
+ maxWeight == null ? null : new BigDecimal(maxWeight), action)._announcer;
if (expectedException != null)
{
@@ -145,4 +145,271 @@ null, new LogOnlyServiceDiscoveryEventEmitter(),
assertEquals(expectedMaxWeightBreachedCount, announcer.getMaxWeightBreachedCount());
assertEquals(expectedWeightDecimalPlacesBreachedCount, announcer.getWeightDecimalPlacesBreachedCount());
}
+
+ @DataProvider(name = "testUpdateAnnouncerStatusForMarkUpDataProvider")
+ public Object[][] testUpdateAnnouncerStatusForMarkUpDataProvider()
+ {
+ return new Object[][]
+ {
+ // Arguments:
+ // initStatus - initial announcer's announcement status
+ // isDarkWarmupEnabled - is dark warmup cluster enabled
+ // isWarmUpMarkUpSuccess - is warmup cluster markup successful
+ // isWarmUpMarkDownSuccess - is warmup cluster markdown successful
+ // isRealMarkUpSuccess - is real cluster markup successful
+ // expectedStatuses - expected announcer status sequence
+
+ // Cases:
+ // --- init status is de-announced ---
+ // dark warmup enabled, all markup and markdown successful
+ {DE_ANNOUNCED, true, true, true, true, Arrays.asList(ANNOUNCING, ANNOUNCED)},
+ // dark warmup enabled, warmup cluster markup successful but markdown failed, real cluster markup successful
+ {DE_ANNOUNCED, true, true, false, true, Arrays.asList(ANNOUNCING, ANNOUNCED)},
+ // dark warmup enabled, warmup cluster markup failed, real cluster markup successful
+ {DE_ANNOUNCED, true, false, true, true, Arrays.asList(ANNOUNCING, ANNOUNCED)},
+ // dark warmup enabled, warmup cluster markup failed, real cluster markup failed
+ {DE_ANNOUNCED, true, false, true, false, Collections.singletonList(ANNOUNCING)},
+ // dark warmup disabled, real cluster markup successful
+ {DE_ANNOUNCED, false, true, true, true, Arrays.asList(ANNOUNCING, ANNOUNCED)},
+ // dark warmup disabled, real cluster markup failed
+ {DE_ANNOUNCED, false, true, true, false, Collections.singletonList(ANNOUNCING)},
+
+ // --- init status is announcing ---
+ // dark warmup enabled, all markup and markdown successful
+ {ANNOUNCING, true, true, true, true, Collections.singletonList(ANNOUNCED)},
+ // dark warmup enabled, warmup cluster markup successful but markdown failed, real cluster markup successful
+ {ANNOUNCING, true, true, false, true, Collections.singletonList(ANNOUNCED)},
+ // dark warmup enabled, warmup cluster markup failed, real cluster markup successful
+ {ANNOUNCING, true, false, true, true, Collections.singletonList(ANNOUNCED)},
+ // dark warmup enabled, warmup cluster markup failed, real cluster markup failed
+ {ANNOUNCING, true, false, true, false, Collections.emptyList()},
+ // dark warmup disabled, real cluster markup successful
+ {ANNOUNCING, false, true, true, true, Collections.singletonList(ANNOUNCED)},
+ // dark warmup disabled, real cluster markup failed
+ {ANNOUNCING, false, true, true, false, Collections.emptyList()},
+
+ // --- other init statuses have the same behavior ---
+ {ANNOUNCED, false, true, true, true, Arrays.asList(ANNOUNCING, ANNOUNCED)},
+ {ANNOUNCED, false, true, true, false, Collections.singletonList(ANNOUNCING)},
+ {DE_ANNOUNCING, false, true, true, true, Arrays.asList(ANNOUNCING, ANNOUNCED)},
+ {DE_ANNOUNCING, false, true, true, false, Collections.singletonList(ANNOUNCING)}
+ };
+ }
+ @Test(dataProvider = "testUpdateAnnouncerStatusForMarkUpDataProvider")
+ public void testUpdateAnnouncerStatusForMarkUp(AnnouncementStatus initStatus, boolean isDarkWarmupEnabled,
+ boolean isWarmUpMarkUpSuccess, boolean isWarmUpMarkDownSuccess, boolean isRealMarkUpSuccess,
+ List expectedStatuses)
+ {
+ ZooKeeperAnnouncerFixture fixture = new ZooKeeperAnnouncerFixture(isDarkWarmupEnabled, isWarmUpMarkUpSuccess,
+ isWarmUpMarkDownSuccess, isRealMarkUpSuccess, initStatus);
+
+ fixture._announcer.markUp(fixture.getCallback());
+
+ fixture.waitForCallback(true);
+ fixture.verifyAnnouncementStatusUpdates(expectedStatuses);
+ fixture.shutdown();
+ }
+
+ @DataProvider(name = "testUpdateAnnouncerStatusForMarkDownDataProvider")
+ public Object[][] testUpdateAnnouncerStatusForMarkDownDataProvider()
+ {
+ return new Object[][]{
+ // Arguments:
+ // initStatus - initial announcer's announcement status
+ // isRealMarkDownSuccess - is real cluster markdown successful
+ // expectedStatuses - expected announcer status sequence
+
+ // Cases:
+ // --- init status is announced ---
+ // real cluster markdown successful
+ {ANNOUNCED, true, Arrays.asList(DE_ANNOUNCING, DE_ANNOUNCED)},
+ // real cluster markdown failed
+ {ANNOUNCED, false, Collections.singletonList(DE_ANNOUNCING)},
+
+ // --- init status is de-announcing ---
+ // real cluster markdown successful
+ {DE_ANNOUNCING, true, Collections.singletonList(DE_ANNOUNCED)},
+ // real cluster markdown failed
+ {DE_ANNOUNCING, false, Collections.emptyList()},
+
+ // --- other init statuses have the same behavior ---
+ {DE_ANNOUNCED, true, Arrays.asList(DE_ANNOUNCING, DE_ANNOUNCED)},
+ {DE_ANNOUNCED, false, Collections.singletonList(DE_ANNOUNCING)},
+ {ANNOUNCING, true, Arrays.asList(DE_ANNOUNCING, DE_ANNOUNCED)},
+ {ANNOUNCING, false, Collections.singletonList(DE_ANNOUNCING)}
+ };
+ }
+ @Test(dataProvider = "testUpdateAnnouncerStatusForMarkDownDataProvider")
+ public void testUpdateAnnouncerStatusForMarkDown(AnnouncementStatus initStatus, boolean isRealClusterMarkdownSuccess,
+ List expectedStatuses)
+ {
+ ZooKeeperAnnouncerFixture fixture = new ZooKeeperAnnouncerFixture(isRealClusterMarkdownSuccess, initStatus);
+
+ fixture._announcer.markDown(fixture.getCallback());
+
+ fixture.waitForCallback(false);
+ fixture.verifyAnnouncementStatusUpdates(expectedStatuses);
+ fixture.shutdown();
+ }
+
+ static class ZooKeeperAnnouncerFixture
+ {
+ @Mock
+ private ZooKeeperServer _server;
+ @Mock
+ private ReadinessStatusManager _readinessStatusManager;
+
+ private ZooKeeperAnnouncer _announcer;
+ private final ScheduledExecutorService _executorService;
+ private final AnnouncerStatus _status;
+ private final boolean _isDarkWarmupEnabled;
+ private final boolean _isDarkWarmupMarkupSuccess;
+ private ArgumentCaptor _announcementStatusCaptor =
+ ArgumentCaptor.forClass(AnnouncementStatus.class);
+ private CountDownLatch _callbackLatch = new CountDownLatch(1);
+
+ private static final String REAL_CLUSTER = "RealCluster";
+ private static final String DARK_CLUSTER = "DarkCluster";
+ private static final int WARMUP_DURATION = 1; // in seconds
+ private static final int CALLBACK_TIMEOUT_MS = 200;
+
+ public ZooKeeperAnnouncerFixture()
+ {
+ this(false, false, false, true, true,
+ null, ActionOnWeightBreach.IGNORE, DE_ANNOUNCED);
+ }
+
+ public ZooKeeperAnnouncerFixture(BigDecimal maxWeight, ActionOnWeightBreach actionOnWeightBreach)
+ {
+ this(false, false, false, true, true,
+ maxWeight, actionOnWeightBreach, DE_ANNOUNCED);
+ }
+
+ public ZooKeeperAnnouncerFixture(boolean isDarkWarmupEnabled, boolean isDarkWarmupMarkupSuccess, boolean isDarkWarmupMarkdownSuccess,
+ boolean isRealClusterMarkupSuccess, AnnouncementStatus initStatus)
+ {
+ this(isDarkWarmupEnabled, isDarkWarmupMarkupSuccess, isDarkWarmupMarkdownSuccess, isRealClusterMarkupSuccess,
+ true, null, ActionOnWeightBreach.IGNORE, initStatus);
+ }
+
+ public ZooKeeperAnnouncerFixture(boolean isRealClusterMarkdownSuccess, AnnouncementStatus initStatus)
+ {
+ this(false, false, false, true, isRealClusterMarkdownSuccess,
+ null, ActionOnWeightBreach.IGNORE, initStatus);
+ }
+
+ public ZooKeeperAnnouncerFixture(boolean isDarkWarmupEnabled, boolean isDarkWarmupMarkupSuccess, boolean isDarkWarmupMarkdownSuccess,
+ boolean isRealClusterMarkupSuccess, boolean isRealClusterMarkdownSuccess, BigDecimal maxWeight,
+ ActionOnWeightBreach actionOnWeightBreach, AnnouncementStatus initStatus)
+ {
+ MockitoAnnotations.initMocks(this);
+ doNothing().when(_readinessStatusManager).registerAnnouncerStatus(any());
+ doNothing().when(_readinessStatusManager).onAnnouncerStatusUpdated();
+
+ _executorService = isDarkWarmupEnabled ? Executors.newSingleThreadScheduledExecutor(
+ new NamedThreadFactory("ZooKeeperAnnouncerFixtureExecutor")) : null;
+ _status = spy(new AnnouncerStatus(true, initStatus));
+ doCallRealMethod().when(_status).setAnnouncementStatus(_announcementStatusCaptor.capture());
+ _isDarkWarmupEnabled = isDarkWarmupEnabled;
+ _isDarkWarmupMarkupSuccess = isDarkWarmupMarkupSuccess;
+
+ _announcer = new ZooKeeperAnnouncer(_server, true, isDarkWarmupEnabled, DARK_CLUSTER, WARMUP_DURATION,
+ _executorService, new LogOnlyServiceDiscoveryEventEmitter(), maxWeight, actionOnWeightBreach, _status,
+ _readinessStatusManager);
+ _announcer.setCluster(REAL_CLUSTER);
+ _announcer.setPartitionData(VALID_PARTITION_DATA);
+ _announcer.setUriSpecificProperties(Collections.emptyMap());
+
+ if (isDarkWarmupEnabled)
+ {
+ mockInvokeClusterCallback(DARK_CLUSTER, true, isDarkWarmupMarkupSuccess);
+ mockInvokeClusterCallback(DARK_CLUSTER, false, isDarkWarmupMarkdownSuccess);
+ }
+ mockInvokeClusterCallback(REAL_CLUSTER, true, isRealClusterMarkupSuccess);
+ mockInvokeClusterCallback(REAL_CLUSTER, false, isRealClusterMarkdownSuccess);
+ }
+
+ private void mockInvokeClusterCallback(String cluster, boolean isMarkUp, boolean isSuccess)
+ {
+ if (isMarkUp)
+ {
+ doAnswer(invoc -> invokeCallbackHelper(invoc, 4, isSuccess))
+ .when(_server).markUp(eq(cluster), any(), any(), any(), any());
+ }
+ else
+ {
+ doAnswer(invoc -> invokeCallbackHelper(invoc, 2, isSuccess))
+ .when(_server).markDown(eq(cluster), any(), any());
+ }
+ }
+
+ public void shutdown() {
+ if (_executorService != null) {
+ _executorService.shutdownNow();
+ }
+ }
+
+ public Callback getCallback()
+ {
+ return new Callback()
+ {
+ @Override
+ public void onError(Throwable e)
+ {
+ _callbackLatch.countDown();
+ }
+
+ @Override
+ public void onSuccess(None result)
+ {
+ _callbackLatch.countDown();
+ }
+ };
+ }
+
+ public void waitForCallback(boolean isMarkup)
+ {
+ try
+ {
+ // for markup, if dark warmup is enabled and succeeded, wait for the warmup duration + callback timeout,
+ // otherwise just wait for the callback timeout
+ if(!_callbackLatch.await(isMarkup && _isDarkWarmupEnabled && _isDarkWarmupMarkupSuccess
+ ? WARMUP_DURATION * 1000 + CALLBACK_TIMEOUT_MS : CALLBACK_TIMEOUT_MS, MILLISECONDS))
+ {
+ fail("Timed out waiting for callback");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ fail("Test interrupted while waiting for markup callback");
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ _callbackLatch = new CountDownLatch(1); // reset for next usage
+ }
+ }
+
+ public void verifyAnnouncementStatusUpdates(List expectedStatuses)
+ {
+ assertEquals(expectedStatuses, _announcementStatusCaptor.getAllValues());
+ verify(_readinessStatusManager, times(_announcementStatusCaptor.getAllValues().size()))
+ .onAnnouncerStatusUpdated();
+ reset(_status); // reset the spy to clear the invocation history
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Object invokeCallbackHelper(InvocationOnMock invoc, int callbackIdx, boolean isSuccess)
+ {
+ Callback cb = (Callback) invoc.getArguments()[callbackIdx];
+ if (isSuccess)
+ {
+ cb.onSuccess(None.none());
+ }
+ else
+ {
+ cb.onError(DUMMY_EXCEPTION);
+ }
+ return null;
+ }
+ }
}
diff --git a/gradle.properties b/gradle.properties
index 251dd2577a..d2c3c4d35f 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,4 +1,4 @@
-version=29.78.0
+version=29.79.0
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true