Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include leaderless replicas for GET replication requests #2661

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,9 @@ protected void applyDelete(MessageInfo messageInfo, RemoteReplicaInfo remoteRepl
* remote replica is a leader of the partition of remote data center. This list is used for leader-based cross colo
* replication to exchange missing blobs between only leader replicas. For non-leader replica pairs (leader <->
* standby, standby <-> leader, standby <-> standby), we will wait the missing blobs to come from their leader interactions.
*
* In addition, also include replicas which doesn't have any leaders in local data center.
*
* @param remoteReplicaInfos list of all remote replicas
* @param exchangeMetadataResponseList list of metadata responses received from the remote replicas
* @param leaderReplicaInfosOutput output list of leader replicas. It will populated in this method.
Expand All @@ -1494,8 +1497,13 @@ void getLeaderReplicaList(List<RemoteReplicaInfo> remoteReplicaInfos,
RemoteReplicaInfo remoteReplicaInfo = remoteReplicaInfos.get(i);
ReplicaId localReplica = remoteReplicaInfo.getLocalReplicaId();
ReplicaId remoteReplica = remoteReplicaInfo.getReplicaId();
// Check if local replica and remote replica are leaders for their partition.
// Check if 'local replica and remote replica are leaders' or if 'local replica doesn't have any leader'.
if (leaderBasedReplicationAdmin.isLeaderPair(localReplica, remoteReplica)) {
logger.trace("Sending GET request for leader replica pair {}, {}", localReplica, remoteReplica);
leaderReplicaInfosOutput.add(remoteReplicaInfo);
exchangeMetadataResponseListForLeaderReplicaInfosOutput.add(exchangeMetadataResponseList.get(i));
} else if (leaderBasedReplicationAdmin.isLeaderLessPartition(localReplica.getPartitionId())) {
logger.trace("Sending GET request for leaderless replica {}", localReplica);
leaderReplicaInfosOutput.add(remoteReplicaInfo);
exchangeMetadataResponseListForLeaderReplicaInfosOutput.add(exchangeMetadataResponseList.get(i));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.ambry.replication;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.clustermap.AmbryPartition;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterMapChangeListener;
import com.github.ambry.clustermap.ClusterParticipant;
Expand Down Expand Up @@ -697,6 +698,14 @@ public boolean isLeaderPair(ReplicaId localReplica, ReplicaId remoteReplica) {
}
}

/**
* @param partition partition id
* @return true if this partition doesn't have any leader in local data center.
*/
public boolean isLeaderLessPartition(PartitionId partition) {
return partition.getReplicaIdsByState(ReplicaState.LEADER, dataNodeId.getDatacenterName()).isEmpty();
}

/**
* Retrieves the set of peer leader replicas (in remote data centers) for the given leader partition on this node
* @param leaderPartitionName leader partition name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ public void replicaThreadLeaderBasedReplicationForPUTMessagesTest() throws Excep
Map<DataNodeId, MockHost> hosts = new HashMap<>();
hosts.put(remoteNodeInLocalDC, remoteHostInLocalDC);
hosts.put(remoteNodeInRemoteDC, remoteHostInRemoteDC);
MockNetworkClientFactory mockNetworkClientFactory = new MockNetworkClientFactory(hosts, clusterMap, batchSize,
new MockFindTokenHelper(new BlobIdFactory(clusterMap), replicationConfig));
MockNetworkClientFactory mockNetworkClientFactory = new MockNetworkClientFactory(hosts, clusterMap, batchSize, new MockFindTokenHelper(new BlobIdFactory(clusterMap), replicationConfig));
Pair<StorageManager, ReplicationManager> managers =
createStorageManagerAndReplicationManager(clusterMap, clusterMapConfig, mockHelixParticipant,
mockNetworkClientFactory);
Expand Down Expand Up @@ -324,8 +323,7 @@ public void replicaThreadLeaderBasedReplicationForPUTMessagesTest() throws Excep
// verify that missing messages size equals to the min{batch size, number of PUT messages} placed on remote hosts
int expectedIndex = batchSize - 1;
for (ReplicaThread.ExchangeMetadataResponse exchangeMetadataResponse : responseListForRemoteNodeInRemoteDC) {
assertEquals("mismatch in number of missing messages", batchSize,
exchangeMetadataResponse.missingStoreMessages.size());
assertEquals("mismatch in number of missing messages", batchSize, exchangeMetadataResponse.missingStoreMessages.size());
}

// Filter leader replicas to fetch missing keys
Expand All @@ -337,8 +335,7 @@ public void replicaThreadLeaderBasedReplicationForPUTMessagesTest() throws Excep
// verify that only leader replicas in remoteHost2 are chosen for fetching missing messages.
Set<ReplicaId> remoteReplicasToFetchInReplicaThread =
leaderReplicas.stream().map(RemoteReplicaInfo::getReplicaId).collect(Collectors.toSet());
assertThat("mismatch in leader remote replicas to fetch missing keys", leaderReplicasOnLocalAndRemoteNodes,
is(remoteReplicasToFetchInReplicaThread));
assertThat("mismatch in leader remote replicas to fetch missing keys", leaderReplicasOnLocalAndRemoteNodes, is(remoteReplicasToFetchInReplicaThread));

// verify that the remote token will be moved for leader replicas and will remain 0 for standby replicas as
// missing messages are not fetched yet.
Expand All @@ -351,8 +348,7 @@ public void replicaThreadLeaderBasedReplicationForPUTMessagesTest() throws Excep
remoteReplicaInfosForRemoteDC.get(i).getExchangeMetadataResponse().missingStoreMessages.size(),
responseListForRemoteNodeInRemoteDC.get(i).missingStoreMessages.size());
assertThat("remote token should not move forward for standby replicas until missing keys are fetched",
remoteReplicaInfosForRemoteDC.get(i).getToken(),
not(responseListForRemoteNodeInRemoteDC.get(i).remoteToken));
remoteReplicaInfosForRemoteDC.get(i).getToken(), not(responseListForRemoteNodeInRemoteDC.get(i).remoteToken));
}
}

Expand Down Expand Up @@ -385,8 +381,7 @@ public void replicaThreadLeaderBasedReplicationForPUTMessagesTest() throws Excep
// Remote token for all cross-colo replicas (leader and standby) should have moved forward now as the missing keys
// for standbys are received via intra-dc replication.
for (int i = 0; i < responseListForRemoteNodeInRemoteDC.size(); i++) {
assertEquals("mismatch in remote token set for cross colo replicas",
remoteReplicaInfosForRemoteDC.get(i).getToken(), (responseListForRemoteNodeInRemoteDC.get(i).remoteToken));
assertEquals("mismatch in remote token set for cross colo replicas", remoteReplicaInfosForRemoteDC.get(i).getToken(), (responseListForRemoteNodeInRemoteDC.get(i).remoteToken));
}

// verify replication metrics to track number of cross colo get requests and cross colo bytes fetch rate for standby
Expand All @@ -402,6 +397,108 @@ public void replicaThreadLeaderBasedReplicationForPUTMessagesTest() throws Excep
storageManager.shutdown();
}

/**
* Test replication for leader less partitions.
*/
@Test
public void missingLeadersReplicationTest() throws Exception {
int batchSize = 4;

Map<DataNodeId, MockHost> hosts = new HashMap<>();
hosts.put(remoteNodeInLocalDC, remoteHostInLocalDC);
hosts.put(remoteNodeInRemoteDC, remoteHostInRemoteDC);
MockNetworkClientFactory mockNetworkClientFactory = new MockNetworkClientFactory(hosts, clusterMap, batchSize,
new MockFindTokenHelper(new BlobIdFactory(clusterMap), replicationConfig));
Pair<StorageManager, ReplicationManager> managers =
createStorageManagerAndReplicationManager(clusterMap, clusterMapConfig, mockHelixParticipant,
mockNetworkClientFactory);
StorageManager storageManager = managers.getFirst();
MockReplicationManager replicationManager = (MockReplicationManager) managers.getSecond();

// Set mock local stores on all remoteReplicaInfos which will used during replication.
for (PartitionId partitionId : replicationManager.partitionToPartitionInfo.keySet()) {
localHost.addStore(partitionId, null);
Store localStore = localHost.getStore(partitionId);
localStore.start();
List<RemoteReplicaInfo> remoteReplicaInfos =
replicationManager.partitionToPartitionInfo.get(partitionId).getRemoteReplicaInfos();
remoteReplicaInfos.forEach(remoteReplicaInfo -> remoteReplicaInfo.setLocalStore(localStore));
}

// Add put messages to all partitions on remote hosts in local dc and remote dc
List<PartitionId> partitionIds = clusterMap.getWritablePartitionIds(null);
PartitionId leaderLessPartition = null;
for (PartitionId partitionId : partitionIds) {
addPutMessagesToReplicasOfPartition(partitionId, Arrays.asList(remoteHostInLocalDC, remoteHostInRemoteDC),
batchSize);
if (leaderLessPartition == null) {
// Have 1 leader-less partition
leaderLessPartition = partitionId;
for (ReplicaId replicaId : ((MockPartitionId) partitionId).getReplicaIds()) {
((MockPartitionId) partitionId).setReplicaState(replicaId, ReplicaState.STANDBY);
}
}
}

// Issue STs for leaders
List<? extends ReplicaId> replicaIds = clusterMap.getReplicaIds(replicationManager.dataNodeId);
for (ReplicaId replicaId : replicaIds) {
MockReplicaId mockReplicaId = (MockReplicaId) replicaId;
if (mockReplicaId.getReplicaState() == ReplicaState.LEADER) {
MockPartitionId mockPartitionId = (MockPartitionId) replicaId.getPartitionId();
mockHelixParticipant.onPartitionBecomeLeaderFromStandby(mockPartitionId.toPathString());
}
}

// Replicate with host in remote dc.
ReplicaThread crossColoReplicaThread = replicationManager.dataNodeIdToReplicaThread.get(remoteNodeInRemoteDC);
crossColoReplicaThread.replicate();

// 1. Verify metadata requests are sent to both leader and standby replicas.
List<RemoteReplicaInfo> remoteReplicaInfosForRemoteDC =
crossColoReplicaThread.getRemoteReplicaInfos().get(remoteNodeInRemoteDC);
List<ReplicaThread.ExchangeMetadataResponse> exchangeMetadataResponseList =
crossColoReplicaThread.getExchangeMetadataResponsesInEachCycle().get(remoteNodeInRemoteDC);
assertEquals("Response should contain a response for each replica", remoteReplicaInfosForRemoteDC.size(),
exchangeMetadataResponseList.size());

// 2. Verify that the remote token is moved for leader replicas and 1 leader less replica.
Set<ReplicaId> leaderReplicas =
getRemoteLeaderReplicasWithLeaderPartitionsOnLocalNode(clusterMap, replicationManager.dataNodeId,
remoteNodeInRemoteDC);
for (int i = 0; i < remoteReplicaInfosForRemoteDC.size(); i++) {
if (leaderReplicas.contains(remoteReplicaInfosForRemoteDC.get(i).getReplicaId())) {
assertEquals("Remote token must have been updated for leader replicas",
remoteReplicaInfosForRemoteDC.get(i).getToken(), exchangeMetadataResponseList.get(i).remoteToken);
} else if (remoteReplicaInfosForRemoteDC.get(i).getReplicaId().getPartitionId().equals(leaderLessPartition)) {
assertEquals("Remote token must have been updated for leader-less replicas",
remoteReplicaInfosForRemoteDC.get(i).getToken(), exchangeMetadataResponseList.get(i).remoteToken);
} else {
assertThat("Remote token must not move forward for standby replicas until missing keys are fetched",
remoteReplicaInfosForRemoteDC.get(i).getToken(), not(exchangeMetadataResponseList.get(i).remoteToken));
assertEquals("Missing keys in metadata response must be stored for standby replicas",
remoteReplicaInfosForRemoteDC.get(i).getExchangeMetadataResponse().missingStoreMessages.size(),
exchangeMetadataResponseList.get(i).missingStoreMessages.size());
}
}

// Replicate with host in local dc so that missing messages in standby are fetched from local dc
ReplicaThread intraColoReplicaThread = replicationManager.dataNodeIdToReplicaThread.get(remoteNodeInLocalDC);
intraColoReplicaThread.replicate();

// Replicate again with remote dc. This should process missing keys for standby replicas from previous metadata exchange
crossColoReplicaThread.replicate();

// 3. Verify that remote token for all cross-colo replicas (leader and standby) is moved forward now as the missing
// keys for standbys are received via intra-dc replication.
for (int i = 0; i < exchangeMetadataResponseList.size(); i++) {
assertEquals("mismatch in remote token set for cross colo replicas",
remoteReplicaInfosForRemoteDC.get(i).getToken(), (exchangeMetadataResponseList.get(i).remoteToken));
}

storageManager.shutdown();
}

/**
* Test leader based replication to verify remote token is caught up for standby replicas and updated token is used
* when their state transitions to leader.
Expand Down
Loading