Skip to content

Commit

Permalink
Fix potential empty routing table snapshot during HelixClusterManager…
Browse files Browse the repository at this point in the history
… initialization (#1341)

After routing table provider is created, initial instance config change
event is added into the queue. However, when the event will be dequeued
is not able to be determined. There could be an edge case where even
after clustermap is initialized, the initial event is still in the
queue which causes routing table snapshot to be empty. This PR explictly
checks the snapshot and waits until it gets populated. Otherwise, there
may be no available replicas for router to send requests to.
  • Loading branch information
jsjtzyy authored and cgtz committed Dec 20, 2019
1 parent f7d5646 commit 871b30d
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -139,7 +140,6 @@ public HelixClusterManager(ClusterMapConfig clusterMapConfig, String instanceNam
for (Map.Entry<String, DcZkInfo> entry : dataCenterToZkAddress.entrySet()) {
String dcName = entry.getKey();
String zkConnectStr = entry.getValue().getZkConnectStr();
ClusterChangeHandler clusterChangeHandler = new ClusterChangeHandler(dcName);
// Initialize from every remote datacenter in a separate thread to speed things up.
Utils.newThread(() -> {
try {
Expand All @@ -153,6 +153,8 @@ public HelixClusterManager(ClusterMapConfig clusterMapConfig, String instanceNam
manager.connect();
logger.info("Established connection to Helix manager at {}", zkConnectStr);
}

ClusterChangeHandler clusterChangeHandler = new ClusterChangeHandler(dcName);
// Create RoutingTableProvider of each DC to keep track of partition(replicas) state. Here, we use current
// state based RoutingTableProvider to remove dependency on Helix's pipeline and reduce notification latency.
logger.info("Creating routing table provider associated with Helix manager at {}", zkConnectStr);
Expand All @@ -161,8 +163,6 @@ public HelixClusterManager(ClusterMapConfig clusterMapConfig, String instanceNam
DcInfo dcInfo = new DcInfo(dcName, entry.getValue(), manager, clusterChangeHandler);
dcToDcZkInfo.put(dcName, dcInfo);
dcIdToDcName.put(dcInfo.dcZkInfo.getDcId(), dcName);
dcToRoutingTableSnapshotRef.put(dcName,
new AtomicReference<>(routingTableProvider.getRoutingTableSnapshot()));
routingTableProvider.addRoutingTableChangeListener(clusterChangeHandler, null);
logger.info("Registered routing table change listeners in {}", dcName);

Expand All @@ -179,6 +179,17 @@ public HelixClusterManager(ClusterMapConfig clusterMapConfig, String instanceNam
// Now register listeners to get notified on live instance change in every datacenter.
manager.addLiveInstanceChangeListener(clusterChangeHandler);
logger.info("Registered live instance change listeners for Helix manager at {}", zkConnectStr);

dcToRoutingTableSnapshotRef.computeIfAbsent(dcName, k -> new AtomicReference<>())
.getAndSet(routingTableProvider.getRoutingTableSnapshot());
// the initial routing table change should populate the instanceConfigs, If it's empty that means initial
// change didn't come and thread should wait on the init latch to ensure routing table snapshot is non-empty
if (dcToRoutingTableSnapshotRef.get(dcName).get().getInstanceConfigs().size() == 0) {
// Periodic refresh in routing table provider is enabled by default. In worst case, routerUpdater should
// refresh snapshot and trigger routing table change within 5 minutes. So we make clusterChangeHandler wait
// here for initial notification until times out.
clusterChangeHandler.waitForInitNotification();
}
if (!clusterMapConfig.clustermapListenCrossColo && manager != localManager) {
manager.disconnect();
logger.info("Stopped listening to cross colo ZK server {}", zkConnectStr);
Expand Down Expand Up @@ -546,6 +557,7 @@ private class ClusterChangeHandler
private final AtomicBoolean instanceConfigInitialized = new AtomicBoolean(false);
private final AtomicBoolean liveStateInitialized = new AtomicBoolean(false);
private final AtomicBoolean idealStateInitialized = new AtomicBoolean(false);
private final CountDownLatch routingTableInitLatch = new CountDownLatch(1);

/**
* Initialize a ClusterChangeHandler in the given datacenter.
Expand Down Expand Up @@ -638,11 +650,28 @@ public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationC
*/
@Override
public void onRoutingTableChange(RoutingTableSnapshot routingTableSnapshot, Object context) {
logger.info("Routing table change triggered from {}", dcName);
dcToRoutingTableSnapshotRef.get(dcName).getAndSet(routingTableSnapshot);
if (routingTableInitLatch.getCount() == 1) {
logger.info("Received initial notification for routing table change from {}", dcName);
dcToRoutingTableSnapshotRef.computeIfAbsent(dcName, k -> new AtomicReference<>())
.getAndSet(routingTableSnapshot);
routingTableInitLatch.countDown();
} else {
logger.info("Routing table change triggered from {}", dcName);
dcToRoutingTableSnapshotRef.get(dcName).getAndSet(routingTableSnapshot);
}
helixClusterManagerMetrics.routingTableChangeTriggerCount.inc();
}

/**
* Wait for initial notification from routing table provider
*/
void waitForInitNotification() throws InterruptedException {
// wait slightly more than 5 mins to ensure routerUpdater refreshes the snapshot.
if (!routingTableInitLatch.await(320, TimeUnit.SECONDS)) {
throw new IllegalStateException("Initial routing table change from " + dcName + " didn't come within 5 mins");
}
}

/**
* Populate the initial data from the admin connection. Create nodes, disks, partitions and replicas for the entire
* cluster. An {@link InstanceConfig} will only be looked at if the xid in it is <= currentXid.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.helix.HelixManager;
Expand All @@ -63,7 +61,6 @@
import static com.github.ambry.clustermap.TestUtils.*;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static org.mockito.Mockito.*;


/**
Expand Down Expand Up @@ -545,18 +542,8 @@ public void routingTableProviderChangeTest() throws Exception {
ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props));
// Mock metricRegistry here to introduce a latch based counter for testing purpose
metricRegistry = new MetricRegistry();
MetricRegistry mockMetricRegistry = Mockito.spy(metricRegistry);
Counter mockCounter = Mockito.mock(Counter.class);
AtomicReference<CountDownLatch> routingTableChangeLatch = new AtomicReference<>();
routingTableChangeLatch.set(new CountDownLatch(2));
doAnswer(invocation -> {
routingTableChangeLatch.get().countDown();
return null;
}).when(mockCounter).inc();
doReturn(mockCounter).when(mockMetricRegistry)
.counter(MetricRegistry.name(HelixClusterManager.class, "routingTableChangeTriggerCount"));
HelixClusterManager helixClusterManager = new HelixClusterManager(clusterMapConfig, selfInstanceName,
new MockHelixManagerFactory(helixCluster, null, null), mockMetricRegistry);
new MockHelixManagerFactory(helixCluster, null, null), metricRegistry);
Map<String, AtomicReference<RoutingTableSnapshot>> snapshotsByDc = helixClusterManager.getRoutingTableSnapshots();
RoutingTableSnapshot localDcSnapshot = snapshotsByDc.get(localDc).get();

Expand All @@ -567,27 +554,54 @@ public void routingTableProviderChangeTest() throws Exception {
// verify leader replica of each partition is correct
verifyLeaderReplicasInDc(helixClusterManager, localDc);

// test live instance triggered routing table change
// we purposely bring down one instance and wait for expected number of live instance unless times out.
int initialLiveCnt = localDcSnapshot.getLiveInstances().size();
MockHelixAdmin mockHelixAdmin = helixCluster.getHelixAdminFromDc(localDc);
String instance = instanceConfigsInCluster.stream()
.filter(insConfig -> !insConfig.getInstanceName().equals(selfInstanceName))
.findFirst()
.get()
.getInstanceName();
mockHelixAdmin.bringInstanceDown(instance);
mockHelixAdmin.triggerRoutingTableNotification();
int sleepCnt = 0;
while (helixClusterManager.getRoutingTableSnapshots().get(localDc).get().getLiveInstances().size()
!= initialLiveCnt - 1) {
assertTrue("Routing table change (triggered by bringing down node) didn't come within 1 sec", sleepCnt < 5);
Thread.sleep(200);
sleepCnt++;
}
// then bring up the same instance, the number of live instances should equal to initial count
mockHelixAdmin.bringInstanceUp(instance);
mockHelixAdmin.triggerRoutingTableNotification();
sleepCnt = 0;
while (helixClusterManager.getRoutingTableSnapshots().get(localDc).get().getLiveInstances().size()
!= initialLiveCnt) {
assertTrue("Routing table change (triggered by bringing up node) didn't come within 1 sec", sleepCnt < 5);
Thread.sleep(200);
sleepCnt++;
}

// randomly choose a partition and change the leader replica of it in cluster
List<? extends PartitionId> defaultPartitionIds = helixClusterManager.getAllPartitionIds(DEFAULT_PARTITION_CLASS);
PartitionId partitionToChange = defaultPartitionIds.get((new Random()).nextInt(defaultPartitionIds.size()));
MockHelixAdmin mockHelixAdmin = helixCluster.getHelixAdminFromDc(localDc);
String currentLeaderInstance = mockHelixAdmin.getPartitionToLeaderReplica().get(partitionToChange.toPathString());
int currentLeaderPort = Integer.valueOf(currentLeaderInstance.split("_")[1]);
String newLeaderInstance = mockHelixAdmin.getInstancesForPartition(partitionToChange.toPathString())
.stream()
.filter(k -> !k.equals(currentLeaderInstance))
.findFirst()
.get();
// Best effort to ensure previous latch has counted down to zero, otherwise the delayed routing table change may
// falsely count down new latch and verification is performed based on old view.
// Keep in mind that initial value of CountDownLatch is not always reasonable because the routing table change may
// occur before we add RoutingTableChange listener. So, right here we wait for 3 secs and then proceed with following
// leadership change tests.
routingTableChangeLatch.get().await(3, TimeUnit.SECONDS);
routingTableChangeLatch.set(new CountDownLatch(1));
mockHelixAdmin.changeLeaderReplicaForPartition(partitionToChange.toPathString(), newLeaderInstance);
mockHelixAdmin.triggerRoutingTableNotification();
assertTrue("Routing table change didn't come within 1 second",
routingTableChangeLatch.get().await(1, TimeUnit.SECONDS));
sleepCnt = 0;
while (partitionToChange.getReplicaIdsByState(ReplicaState.LEADER, localDc).get(0).getDataNodeId().getPort()
== currentLeaderPort) {
assertTrue("Routing table change (triggered by leadership change) didn't come within 1 sec", sleepCnt < 5);
Thread.sleep(200);
sleepCnt++;
}
verifyLeaderReplicasInDc(helixClusterManager, localDc);

helixClusterManager.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ CLUSTER_NAME_PREFIX, dcStr, DEFAULT_MAX_PARTITIONS_PER_RESOURCE, new HelixAdminF
}
// bootstrap a cluster
HelixBootstrapUpgradeUtil.bootstrapOrUpgrade(hardwareLayoutPath, partitionLayoutPath, zkLayoutPath,
CLUSTER_NAME_PREFIX, dcStr, DEFAULT_MAX_PARTITIONS_PER_RESOURCE, false, false, new HelixAdminFactory(), true,
CLUSTER_NAME_PREFIX, dcStr, DEFAULT_MAX_PARTITIONS_PER_RESOURCE, false, false, new HelixAdminFactory(), false,
ClusterMapConfig.OLD_STATE_MODEL_DEF);
// add new state model def
HelixBootstrapUpgradeUtil.addStateModelDef(hardwareLayoutPath, partitionLayoutPath, zkLayoutPath,
Expand Down Expand Up @@ -225,7 +225,7 @@ public void testIncompleteZKHostInfo() throws Exception {
try {
HelixBootstrapUpgradeUtil.bootstrapOrUpgrade(hardwareLayoutPath, partitionLayoutPath, zkLayoutPath,
CLUSTER_NAME_PREFIX, dcStr, DEFAULT_MAX_PARTITIONS_PER_RESOURCE, false, false, new HelixAdminFactory(),
true, ClusterMapConfig.DEFAULT_STATE_MODEL_DEF);
false, ClusterMapConfig.DEFAULT_STATE_MODEL_DEF);
fail("Should have thrown IllegalArgumentException as a zk host is missing for one of the dcs");
} catch (IllegalArgumentException e) {
// OK
Expand Down Expand Up @@ -487,7 +487,7 @@ private void writeBootstrapOrUpgrade(long expectedResourceCount, boolean forceRe
// This updates and verifies that the information in Helix is consistent with the one in the static cluster map.
HelixBootstrapUpgradeUtil.bootstrapOrUpgrade(hardwareLayoutPath, partitionLayoutPath, zkLayoutPath,
CLUSTER_NAME_PREFIX, dcStr, DEFAULT_MAX_PARTITIONS_PER_RESOURCE, false, forceRemove, new HelixAdminFactory(),
true, ClusterMapConfig.DEFAULT_STATE_MODEL_DEF);
false, ClusterMapConfig.DEFAULT_STATE_MODEL_DEF);
verifyResourceCount(testHardwareLayout.getHardwareLayout(), expectedResourceCount);
}

Expand Down

0 comments on commit 871b30d

Please sign in to comment.