Skip to content

Commit

Permalink
Implement replica Offline-To-Dropped transition on server side (#1347)
Browse files Browse the repository at this point in the history
This PR implements the last step to remove old replica. Specifically, it
removes old replica from in-mem data structures (in StatsManager,
ReplicationManager and StorageManager). Also, during the transition, all
store files would be deleted in the end and seal/stop list in Helix
would be updated as well.
  • Loading branch information
jsjtzyy authored Feb 7, 2020
1 parent 6f4d57f commit bfb751b
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,10 @@ public interface PartitionStateChangeListener {
* @param partitionName of the partition
*/
void onPartitionBecomeOfflineFromInactive(String partitionName);

/**
* Action to take when partition becomes dropped from offline.
* @param partitionName of the partition.
*/
void onPartitionBecomeDroppedFromOffline(String partitionName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ public void onBecomeOfflineFromInactive(Message message, NotificationContext con

@Transition(to = "DROPPED", from = "OFFLINE")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
logger.info("Partition {} in resource {} is becoming DROPPED from OFFLINE", message.getPartitionName(),
String partitionName = message.getPartitionName();
logger.info("Partition {} in resource {} is becoming DROPPED from OFFLINE", partitionName,
message.getResourceName());
if (clusterMapConfig.clustermapEnableStateModelListener) {
partitionStateChangeListener.onPartitionBecomeDroppedFromOffline(partitionName);
}
}

@Transition(to = "DROPPED", from = "ERROR")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -166,7 +165,7 @@ static int getSchemaVersion(InstanceConfig instanceConfig) {
*/
static List<String> getSealedReplicas(InstanceConfig instanceConfig) {
List<String> sealedReplicas = instanceConfig.getRecord().getListField(ClusterMapUtils.SEALED_STR);
return sealedReplicas == null ? Collections.emptyList() : sealedReplicas;
return sealedReplicas == null ? new ArrayList<>() : sealedReplicas;
}

/**
Expand All @@ -177,7 +176,7 @@ static List<String> getSealedReplicas(InstanceConfig instanceConfig) {
*/
static List<String> getStoppedReplicas(InstanceConfig instanceConfig) {
List<String> stoppedReplicas = instanceConfig.getRecord().getListField(ClusterMapUtils.STOPPED_REPLICAS_STR);
return stoppedReplicas == null ? Collections.emptyList() : stoppedReplicas;
return stoppedReplicas == null ? new ArrayList<>() : stoppedReplicas;
}

/**
Expand Down Expand Up @@ -218,7 +217,6 @@ public static Integer getHttp2PortStr(InstanceConfig instanceConfig) {
return http2PortStr == null ? null : Integer.valueOf(http2PortStr);
}


/**
* Get the xid associated with this instance. The xid is like a timestamp or a change number, so if it is absent,
* a value representing the earliest point in time is returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,19 @@ private boolean addNewReplicaInfo(ReplicaId replicaId, InstanceConfig instanceCo
*/
private boolean removeOldReplicaInfo(ReplicaId replicaId, InstanceConfig instanceConfig) {
boolean removalResult = true;
boolean replicaFound = false;
boolean instanceConfigUpdated = false;
boolean replicaFound;
String partitionName = replicaId.getPartitionId().toPathString();
List<String> stoppedReplicas = instanceConfig.getRecord().getListField(ClusterMapUtils.STOPPED_REPLICAS_STR);
List<String> sealedReplicas = instanceConfig.getRecord().getListField(ClusterMapUtils.SEALED_STR);
stoppedReplicas = stoppedReplicas == null ? new ArrayList<>() : stoppedReplicas;
sealedReplicas = sealedReplicas == null ? new ArrayList<>() : sealedReplicas;
if (stoppedReplicas.remove(partitionName) || sealedReplicas.remove(partitionName)) {
logger.info("Removing partition {} from stopped and sealed list", partitionName);
instanceConfig.getRecord().setListField(ClusterMapUtils.STOPPED_REPLICAS_STR, stoppedReplicas);
instanceConfig.getRecord().setListField(ClusterMapUtils.SEALED_STR, sealedReplicas);
instanceConfigUpdated = true;
}
Map<String, Map<String, String>> mountPathToDiskInfos = instanceConfig.getRecord().getMapFields();
Map<String, String> diskInfo = mountPathToDiskInfos.get(replicaId.getMountPath());
if (diskInfo != null) {
Expand All @@ -354,12 +365,14 @@ private boolean removeOldReplicaInfo(ReplicaId replicaId, InstanceConfig instanc
mountPathToDiskInfos.put(replicaId.getMountPath(), diskInfo);
// update InstanceConfig
instanceConfig.getRecord().setMapFields(mountPathToDiskInfos);
logger.info("Updating config: {} in Helix by removing partition {}", instanceConfig, partitionName);
removalResult = helixAdmin.setInstanceConfig(clusterName, instanceName, instanceConfig);
instanceConfigUpdated = true;
}
}
}
if (!replicaFound) {
if (instanceConfigUpdated) {
logger.info("Updating config: {} in Helix by removing partition {}", instanceConfig, partitionName);
removalResult = helixAdmin.setInstanceConfig(clusterName, instanceName, instanceConfig);
} else {
logger.warn("Partition {} is not found on instance {}, skipping removing it from InstanceConfig in Helix.",
partitionName, instanceName);
}
Expand Down Expand Up @@ -535,12 +548,33 @@ public void onPartitionBecomeOfflineFromInactive(String partitionName) {
throw e;
}
}
// 3. take actions in storage manager (stop the store)
// 3. take actions in storage manager (stop the store and update instanceConfig)
PartitionStateChangeListener storageManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener);
if (storageManagerListener != null) {
storageManagerListener.onPartitionBecomeOfflineFromInactive(partitionName);
}
// 4. todo update instanceConfig in helix
}

@Override
public void onPartitionBecomeDroppedFromOffline(String partitionName) {
// 1. remove old replica from StatsManager
PartitionStateChangeListener statsManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.StatsManagerListener);
if (statsManagerListener != null) {
statsManagerListener.onPartitionBecomeDroppedFromOffline(partitionName);
}
// 2. remove old replica from ReplicationManager
PartitionStateChangeListener replicationManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener);
if (replicationManagerListener != null) {
replicationManagerListener.onPartitionBecomeDroppedFromOffline(partitionName);
}
// 3. remove old replica from StorageManager and delete store directory
PartitionStateChangeListener storageManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener);
if (storageManagerListener != null) {
storageManagerListener.onPartitionBecomeDroppedFromOffline(partitionName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,15 @@ public void onPartitionBecomeInactiveFromStandby(String partitionName) {
// no op
}

@Override
public void onPartitionBecomeOfflineFromInactive(String partitionName) {
// no op
}

@Override
public void onPartitionBecomeDroppedFromOffline(String partitionName) {
// no op
}
});
StateModel stateModel;
switch (config.clustermapStateModelDefinition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,22 +330,25 @@ public void testUpdateNodeInfoInCluster() throws Exception {
MockClusterMap clusterMap = new MockClusterMap(false, 1, 3, 3, false);
MockDataNodeId localNode = clusterMap.getDataNodes().get(0);
List<ReplicaId> localReplicas = clusterMap.getReplicaIds(localNode);
ReplicaId existingReplica = localReplicas.get(0);
// override some props for current test
props.setProperty("clustermap.update.datanode.info", Boolean.toString(true));
props.setProperty("clustermap.port", String.valueOf(localNode.getPort()));
ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props));
HelixParticipant participant = new HelixParticipant(clusterMapConfig, helixManagerFactory);
// create InstanceConfig for local node
InstanceConfig instanceConfig = generateInstanceConfig(clusterMap, localNode);
// create InstanceConfig for local node. Also, put existing replica into sealed list
List<String> sealedList = new ArrayList<>();
sealedList.add(existingReplica.getPartitionId().toPathString());
InstanceConfig instanceConfig = generateInstanceConfig(clusterMap, localNode, sealedList);
HelixAdmin helixAdmin = participant.getHelixAdmin();
helixAdmin.addCluster(clusterMapConfig.clusterMapClusterName);
helixAdmin.addInstance(clusterMapConfig.clusterMapClusterName, instanceConfig);
String instanceName = ClusterMapUtils.getInstanceName(localNode.getHostname(), localNode.getPort());
// generate exactly same config for comparison
InstanceConfig initialInstanceConfig = generateInstanceConfig(clusterMap, localNode);
InstanceConfig initialInstanceConfig = generateInstanceConfig(clusterMap, localNode, sealedList);
// 1. add existing replica's info to Helix should be no-op
assertTrue("Adding existing replica's info should succeed",
participant.updateDataNodeInfoInCluster(localReplicas.get(0), true));
participant.updateDataNodeInfoInCluster(existingReplica, true));
assertEquals("InstanceConfig should stay unchanged", initialInstanceConfig,
helixAdmin.getInstanceConfig(clusterMapConfig.clusterMapClusterName, instanceName));
// create two new partitions on the same disk of local node
Expand Down Expand Up @@ -376,11 +379,11 @@ public void testUpdateNodeInfoInCluster() throws Exception {
// 5. remove same replica again (id = 9, replicaFromPartition1) should be no-op
assertTrue("Removing non-found replica info from InstanceConfig should succeed.",
participant.updateDataNodeInfoInCluster(replicaFromPartition1, false));
// 6. remove another existing replica should succeed
// 6. remove an existing replica should succeed
assertTrue("Removing replica info from InstanceConfig should succeed.",
participant.updateDataNodeInfoInCluster(localReplicas.get(0), false));
participant.updateDataNodeInfoInCluster(existingReplica, false));
instanceConfig = helixAdmin.getInstanceConfig(clusterMapConfig.clusterMapClusterName, instanceName);
verifyReplicaInfoInInstanceConfig(instanceConfig, localReplicas.get(0), false);
verifyReplicaInfoInInstanceConfig(instanceConfig, existingReplica, false);
verifyReplicaInfoInInstanceConfig(instanceConfig, replicaFromPartition2, true);
// reset props
props.setProperty("clustermap.update.datanode.info", Boolean.toString(false));
Expand All @@ -391,9 +394,11 @@ public void testUpdateNodeInfoInCluster() throws Exception {
* Generate {@link InstanceConfig} for given data node.
* @param clusterMap {@link MockClusterMap} to use
* @param dataNode the data node associated with InstanceConfig.
* @param sealedReplicas the sealed replicas that should be placed into sealed list. This can be null.
* @return {@link InstanceConfig} of given data node.
*/
private InstanceConfig generateInstanceConfig(MockClusterMap clusterMap, MockDataNodeId dataNode) {
private InstanceConfig generateInstanceConfig(MockClusterMap clusterMap, MockDataNodeId dataNode,
List<String> sealedReplicas) {
String instanceName = ClusterMapUtils.getInstanceName(dataNode.getHostname(), dataNode.getPort());
InstanceConfig instanceConfig = new InstanceConfig(instanceName);
instanceConfig.setHostName(dataNode.getHostname());
Expand Down Expand Up @@ -427,6 +432,8 @@ private InstanceConfig generateInstanceConfig(MockClusterMap clusterMap, MockDat
mountPathToDiskInfos.put(mountPath, diskInfo);
}
instanceConfig.getRecord().setMapFields(mountPathToDiskInfos);
instanceConfig.getRecord()
.setListField(ClusterMapUtils.SEALED_STR, sealedReplicas == null ? new ArrayList<>() : sealedReplicas);
return instanceConfig;
}

Expand All @@ -445,12 +452,16 @@ private void verifyReplicaInfoInInstanceConfig(InstanceConfig instanceConfig, Re
for (String replicaInfo : diskInfo.get(REPLICAS_STR).split(REPLICAS_DELIM_STR)) {
replicasOnDisk.add(replicaInfo.split(REPLICAS_STR_SEPARATOR)[0]);
}
List<String> sealedList = getSealedReplicas(instanceConfig);
List<String> stoppedList = getStoppedReplicas(instanceConfig);
String partitionName = replicaId.getPartitionId().toPathString();
if (shouldExist) {
assertTrue("New replica is not found in InstanceConfig",
replicasOnDisk.contains(replicaId.getPartitionId().toPathString()));
assertTrue("New replica is not found in InstanceConfig", replicasOnDisk.contains(partitionName));
} else {
assertFalse("Old replica should not exist in InstanceConfig",
replicasOnDisk.contains(replicaId.getPartitionId().toPathString()));
assertFalse("Old replica should not exist in InstanceConfig", replicasOnDisk.contains(partitionName));
// make sure the replica is not present in sealed/stopped list
assertFalse("Old replica should not exist in sealed/stopped list",
sealedList.contains(partitionName) || stoppedList.contains(partitionName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,12 @@ public void onPartitionBecomeOfflineFromInactive(String partitionName) {
partitionName);
}

@Override
public void onPartitionBecomeDroppedFromOffline(String partitionName) {
logger.info("Partition state change notification from Offline to Dropped received for partition {}",
partitionName);
}

/**
* If only config specified list of partitions are being replicated from cloud, then check that the partition
* belongs to the specified list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void start() throws ReplicationException {
*/
public boolean addReplica(ReplicaId replicaId) {
if (partitionToPartitionInfo.containsKey(replicaId.getPartitionId())) {
logger.error("{} already exists in replication manager, rejecting adding replica request.",
logger.error("Partition {} already exists in replication manager, rejecting adding replica request.",
replicaId.getPartitionId());
return false;
}
Expand All @@ -137,11 +137,11 @@ public boolean addReplica(ReplicaId replicaId) {
remoteReplicaInfos = createRemoteReplicaInfos(peerReplicas, replicaId);
updatePartitionInfoMaps(remoteReplicaInfos, replicaId);
}
logger.info("Assigning thread for {}", replicaId.getPartitionId());
logger.info("Assigning thread for partition {}", replicaId.getPartitionId());
addRemoteReplicaInfoToReplicaThread(remoteReplicaInfos, true);
// No need to update persistor to explicitly persist tokens for new replica because background persistor will
// periodically persist all tokens including new added replica's
logger.info("{} is successfully added into replication manager", replicaId.getPartitionId());
logger.info("Partition {} is successfully added into replication manager", replicaId.getPartitionId());
return true;
}

Expand All @@ -152,7 +152,7 @@ public boolean addReplica(ReplicaId replicaId) {
*/
public boolean removeReplica(ReplicaId replicaId) {
if (!partitionToPartitionInfo.containsKey(replicaId.getPartitionId())) {
logger.error("{} doesn't exist in replication manager, skipping removing replica request.",
logger.error("Partition {} doesn't exist in replication manager, skipping removing replica request.",
replicaId.getPartitionId());
return false;
}
Expand All @@ -165,7 +165,7 @@ public boolean removeReplica(ReplicaId replicaId) {
return v;
});
partitionToPartitionInfo.remove(replicaId.getPartitionId());
logger.info("{} is successfully removed from replication manager", replicaId.getPartitionId());
logger.info("Partition {} is successfully removed from replication manager", replicaId.getPartitionId());
return true;
}

Expand Down Expand Up @@ -304,5 +304,14 @@ public void onPartitionBecomeOfflineFromInactive(String partitionName) {
store.setCurrentState(ReplicaState.OFFLINE);
replicaSyncUpManager.initiateDisconnection(localReplica);
}

@Override
public void onPartitionBecomeDroppedFromOffline(String partitionName) {
ReplicaId replica = storeManager.getReplica(partitionName);
// if code arrives here, we don't need to check if replica exists, it has been checked in StatsManager
// here we attempt to remove replica from replication manager. If replica doesn't exist, log info but don't fail
// the transition
removeReplica(replica);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static MockReplicationManager getReplicationManager(VerifiableProperties
dataNodeId, storeKeyConverterFactory, null);
}

MockReplicationManager(ReplicationConfig replicationConfig, ClusterMapConfig clusterMapConfig,
public MockReplicationManager(ReplicationConfig replicationConfig, ClusterMapConfig clusterMapConfig,
StoreConfig storeConfig, StorageManager storageManager, ClusterMap clusterMap, DataNodeId dataNodeId,
StoreKeyConverterFactory storeKeyConverterFactory, ClusterParticipant clusterParticipant)
throws ReplicationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ StatsSnapshot fetchSnapshot(PartitionId partitionId, List<PartitionId> unreachab
boolean addReplica(ReplicaId id) {
boolean success = partitionToReplicaMap.putIfAbsent(id.getPartitionId(), id) == null;
if (success) {
logger.info(id.getPartitionId() + " is added into StatsManager");
logger.info("Partition " + id.getPartitionId() + " is added into StatsManager");
} else {
logger.error("Failed to add " + id.getPartitionId() + " because it is already in StatsManager");
logger.error("Failed to add partition " + id.getPartitionId() + " because it is already in StatsManager");
}
return success;
}
Expand All @@ -210,9 +210,9 @@ boolean addReplica(ReplicaId id) {
boolean removeReplica(ReplicaId id) {
boolean success = partitionToReplicaMap.remove(id.getPartitionId()) != null;
if (success) {
logger.info(id.getPartitionId() + " is removed from StatsManager");
logger.info("Partition " + id.getPartitionId() + " is removed from StatsManager");
} else {
logger.error("Failed to remove " + id.getPartitionId() + " because it doesn't exist in StatsManager");
logger.error("Failed to remove partition " + id.getPartitionId() + " because it doesn't exist in StatsManager");
}
return success;
}
Expand Down Expand Up @@ -422,5 +422,17 @@ public void onPartitionBecomeOfflineFromInactive(String partitionName) {
logger.info("Partition state change notification from Inactive to Offline received for partition {}",
partitionName);
}

@Override
public void onPartitionBecomeDroppedFromOffline(String partitionName) {
// check if partition exists
ReplicaId replica = storageManager.getReplica(partitionName);
if (replica == null) {
throw new StateTransitionException("Replica " + partitionName + " is not found on current node",
ReplicaNotFound);
}
// remove replica from in-mem data structure. If replica doesn't exist, log info but don't fail the transition
removeReplica(replica);
}
}
}
Loading

0 comments on commit bfb751b

Please sign in to comment.