diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeUtil.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeUtil.java index 8516d20b46..d31a0fc693 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeUtil.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeUtil.java @@ -181,8 +181,7 @@ public enum HelixAdminOperation { EnablePartition, ResetPartition, ListSealedPartition, - MigrateToPropertyStore, - MigrateToFullAuto + MigrateToPropertyStore } /** @@ -412,34 +411,86 @@ static void migrateToPropertyStore(String hardwareLayoutPath, String partitionLa /** * Migrate resources from semi-auto to full-auto. - * - * @param partitionLayoutPath the path to the partition layout file. - * @param zkLayoutPath the path to the zookeeper layout file. - * @param clusterNamePrefix the prefix that when combined with the cluster name in the static - * cluster map files will give the cluster name in Helix to bootstrap or - * upgrade. - * @param dcs the comma-separated list of data centers that needs to be migrated - * @param resources the comma-separated list of resources that needs to be migrated - * @param dryRun if true, perform a dry run; do not update anything in Helix. - * @param wagedConfigFilePath disk capacity of each partition. This is used as a weight in Waged - * rebalancer. - * @param hardwareLayoutPath the path to the hardware layout file. - * @param maxInstancesInOneResourceForFullAuto max number of instances to be assigned under one resource when the resources - * are in full auto compatible mode - * @throws Exception - */ - static void migrateToFullAuto(String partitionLayoutPath, String zkLayoutPath, String clusterNamePrefix, String dcs, - String resources, boolean dryRun, String wagedConfigFilePath, String hardwareLayoutPath, - int maxInstancesInOneResourceForFullAuto) throws Exception { - HelixBootstrapUpgradeUtil helixBootstrapUpgradeUtil = - new HelixBootstrapUpgradeUtil(hardwareLayoutPath, partitionLayoutPath, zkLayoutPath, clusterNamePrefix, dcs, - DEFAULT_MAX_PARTITIONS_PER_RESOURCE, dryRun, false, null, null, null, null, null, - HelixAdminOperation.MigrateToFullAuto, PROPERTY_STORE, false, maxInstancesInOneResourceForFullAuto); + * @param clusterName Cluster name in Helix. + * @param dcs The comma-separated list of data centers that needs to be migrated to + * Full Auto. + * @param zkLayoutPath The path to the zookeeper layout file. + * @param resources Comma-separated list of resources that needs to be migrated + * @param helixAdminFactory The {@link HelixAdminFactory} to use to instantiate {@link HelixAdmin} + * @param wagedConfigFilePath Path to waged configuration properties file. + * @param maxInstancesInOneResourceForFullAuto Max number of instances to be assigned under one resource when the + * resources are in full auto compatible mode + * @param setPreferenceList If {@code true}, sets semi-auto placement as preference list for + * full-auto + * @param dryRun If true, perform a dry run; do not update anything in Helix. + */ + static void migrateToFullAuto(String clusterName, String dcs, String zkLayoutPath, String resources, + HelixAdminFactory helixAdminFactory, String wagedConfigFilePath, int maxInstancesInOneResourceForFullAuto, + boolean setPreferenceList, boolean dryRun) throws Exception { + Map dataCenterToZkAddress = parseAndUpdateDcInfoFromArg(dcs, zkLayoutPath); + for (Map.Entry entry : dataCenterToZkAddress.entrySet()) { + String dcName = entry.getKey(); + List zkConnectStrs = entry.getValue().getZkConnectStrs(); + if (zkConnectStrs.size() != 1) { + throw new IllegalArgumentException( + entry.getKey() + " has invalid number of ZK endpoints: " + zkConnectStrs.size()); + } + HelixAdmin admin = helixAdminFactory.getHelixAdmin(zkConnectStrs.get(0)); + info("Migrate to Full-Auto. Cluster {}, dc {}, resources {}, dry run {}", clusterName, dcName, resources, dryRun); + migrateToFullAuto(dcName, clusterName, admin, zkConnectStrs.get(0), resources, + new ObjectMapper().readValue(Utils.readStringFromFile(wagedConfigFilePath), WagedHelixConfig.class), + setPreferenceList, maxInstancesInOneResourceForFullAuto, dryRun); + } + } - WagedHelixConfig wagedHelixConfig = - new ObjectMapper().readValue(Utils.readStringFromFile(wagedConfigFilePath), WagedHelixConfig.class); + /** + * Roll back resources in a cluster from Full-Auto to Semi-auto + * @param clusterName Cluster name in Helix. + * @param dcs The comma-separated list of data centers that needs to be rolled back from Full-Auto + * to Semi-Auto. + * @param zkLayoutPath The path to the zookeeper layout file. + * @param commaSeparatedResources Comma-separated list of resources that needs to be rolled back. + * @param helixAdminFactory The {@link HelixAdminFactory} to use to instantiate {@link HelixAdmin} + */ + static void rollbackToSemiAuto(String clusterName, String dcs, String zkLayoutPath, String commaSeparatedResources, + HelixAdminFactory helixAdminFactory) throws Exception { + Map dataCenterToZkAddress = parseAndUpdateDcInfoFromArg(dcs, zkLayoutPath); + for (Map.Entry entry : dataCenterToZkAddress.entrySet()) { + String dcName = entry.getKey(); + List zkConnectStrs = entry.getValue().getZkConnectStrs(); + if (zkConnectStrs.size() != 1) { + throw new IllegalArgumentException( + entry.getKey() + " has invalid number of ZK endpoints: " + zkConnectStrs.size()); + } + HelixAdmin admin = helixAdminFactory.getHelixAdmin(zkConnectStrs.get(0)); + info("Rollback to Semi-Auto. Cluster {}, dc {}, resources {}", clusterName, dcName, commaSeparatedResources); + + // Get resources to migrate from user input. + Set helixResources = + admin.getResourcesInCluster(clusterName).stream().filter(s -> s.matches("\\d+")).collect(Collectors.toSet()); + Set resources; + if (commaSeparatedResources.equalsIgnoreCase(ALL)) { + resources = helixResources; + } else { + resources = + Arrays.stream(commaSeparatedResources.replaceAll("\\p{Space}", "").split(",")).collect(Collectors.toSet()); + resources.removeIf(resource -> { + if (!helixResources.contains(resource)) { + logger.info("Resource {} is not present in data center {}", resource, dcName); + return true; + } + return false; + }); + } - helixBootstrapUpgradeUtil.migrateToFullAuto(resources, wagedHelixConfig); + // Update ideal state to semi-auto for all the resources + for (String resourceName : resources) { + IdealState idealState = admin.getResourceIdealState(clusterName, resourceName); + idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO); + idealState.setReplicas(ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.name()); + admin.updateIdealState(clusterName, resourceName, idealState); + } + } } /** @@ -466,6 +517,53 @@ static void dropCluster(String zkLayoutPath, String clusterName, String dcs, Hel } } + /** + * Removes given list of partitions from preference lists in Resource configs + * @param zkLayoutPath the path to the zookeeper layout file. + * @param clusterName the name of the cluster in Helix. + * @param dcs the comma-separated list of data centers in which preference lists needs to be + * updated. + * @param resourceName the resource name for which preference list needs to be updated. + * @param commaSeparatedPartitions comma separated list of partitions that needs to be removed from preference list. + * If "all" is provided, it removes all existing partitions. + */ + static void updatePreferenceLists(String zkLayoutPath, String clusterName, String dcs, String resourceName, + String commaSeparatedPartitions) throws Exception { + + Map dataCenterToZkAddress = parseAndUpdateDcInfoFromArg(dcs, zkLayoutPath); + info("Removing partitions {} in resource {} from preference lists in cluster {}", commaSeparatedPartitions, + resourceName, clusterName); + + for (Map.Entry entry : dataCenterToZkAddress.entrySet()) { + List zkConnectStrs = entry.getValue().getZkConnectStrs(); + if (zkConnectStrs.size() != 1) { + throw new IllegalArgumentException( + entry.getKey() + " has invalid number of ZK endpoints: " + zkConnectStrs.size()); + } + // 1. Get current preference lists from resource config + ConfigAccessor configAccessor = new ConfigAccessor(zkConnectStrs.get(0)); + ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, resourceName); + Map> resourceConfigPreferenceLists = resourceConfig.getPreferenceLists(); + // 2. Remove input partitions from preference lists + Set partitionSet; + if (commaSeparatedPartitions.equalsIgnoreCase(ALL)) { + partitionSet = resourceConfigPreferenceLists.keySet(); + } else { + partitionSet = + Arrays.stream(commaSeparatedPartitions.replaceAll("\\p{Space}", "").split(",")).collect(Collectors.toSet()); + } + for (String partition : partitionSet) { + resourceConfigPreferenceLists.remove(partition); + info("Removing partition {} under resource {} in resource config", partition, resourceName); + } + // 3. Write to helix + resourceConfig.setPreferenceLists(resourceConfigPreferenceLists); + configAccessor.setResourceConfig(clusterName, resourceName, resourceConfig); + info("Removed partitions {} in resource {} from preference lists in resource config for cluster {} in dc {}", + commaSeparatedPartitions, resourceName, clusterName, entry.getKey()); + } + } + /** * Generate cluster admin configs based on admin type and upload them to Helix PropertyStore * @param clusterMapToHelixMapper {@link HelixBootstrapUpgradeUtil} to use. @@ -769,7 +867,8 @@ private void uploadClusterAdminInfos(Map String clusterAdminType, String adminConfigZNodePath) { for (String dcName : dataCenterToZkAddress.keySet()) { info("Uploading {} infos for datacenter {}.", clusterAdminType, dcName); - HelixPropertyStore helixPropertyStore = createHelixPropertyStore(dcName); + HelixPropertyStore helixPropertyStore = + createHelixPropertyStore(this.clusterName, dataCenterToZkAddress.get(dcName).getZkConnectStrs().get(0)); try { ZNRecord znRecord = new ZNRecord(clusterAdminType); znRecord.setMapFields(adminInfosByDc.get(dcName)); @@ -790,7 +889,8 @@ private void uploadClusterAdminInfos(Map private void deleteClusterAdminInfos(String clusterAdminType, String adminConfigZNodePath) { for (Map.Entry entry : dataCenterToZkAddress.entrySet()) { info("Deleting {} infos for datacenter {}.", clusterAdminType, entry.getKey()); - HelixPropertyStore helixPropertyStore = createHelixPropertyStore(entry.getKey()); + HelixPropertyStore helixPropertyStore = + createHelixPropertyStore(this.clusterName, entry.getValue().getZkConnectStrs().get(0)); if (!helixPropertyStore.remove(adminConfigZNodePath, AccessOption.PERSISTENT)) { logger.error("Failed to remove {} infos from datacenter {}", clusterAdminType, entry.getKey()); } @@ -838,144 +938,141 @@ private void migrateToPropertyStore() throws InterruptedException { /** * Convert resources from semi-auto to full-auto. - * @param commaSeparatedResources the comma-separated list of resources that needs to be migrated - * @param wagedHelixConfig - * - */ - public void migrateToFullAuto(String commaSeparatedResources, WagedHelixConfig wagedHelixConfig) - throws InterruptedException { - CountDownLatch migrationComplete = new CountDownLatch(adminForDc.size()); - // different DCs can be migrated in parallel - adminForDc.forEach((dcName, helixAdmin) -> Utils.newThread(() -> { - try { - - // 0. Check if all resources in the cluster are full-auto compatible - boolean resourcesAreFullAutoCompatible = maybeVerifyResourcesAreFullAutoCompatible(dcName, clusterName); - if (!resourcesAreFullAutoCompatible) { - throw new IllegalStateException("Resources are not full auto compatible"); - } - verifyPartitionPlacementIsFullAutoCompatibleInStatic(dcName); - // Get property store - ClusterMapConfig config = getClusterMapConfig(clusterName, dcName, null); - String zkConnectStr = dataCenterToZkAddress.get(dcName).getZkConnectStrs().get(0); - try (PropertyStoreToDataNodeConfigAdapter propertyStoreAdapter = new PropertyStoreToDataNodeConfigAdapter( - zkConnectStr, config)) { - - // Get resources to migrate from user input. - Set helixResources = helixAdmin.getResourcesInCluster(clusterName) - .stream() - .filter(s -> s.matches("\\d+")) + * @param dc data center name + * @param clusterName cluster name + * @param admin {@link HelixAdmin} to do admin operations on the helix cluster + * @param zkConnectStr zookeeper connect string for the cluster + * @param commaSeparatedResources the comma-separated list of resources that needs to be migrated + * @param wagedHelixConfig waged configuration for Full-Auto mode + * @param setPreferenceList if {@code true}, sets semi-auto placement as preference list for full-auto + * @param maxInstancesInOneResourceForFullAuto max number of instances to be assigned under one resource when the + * resources are in full auto compatible mode + * @param dryRun if true, perform a dry run; do not change ideal state to Full-Auto in Helix. + */ + private static void migrateToFullAuto(String dc, String clusterName, HelixAdmin admin, String zkConnectStr, + String commaSeparatedResources, WagedHelixConfig wagedHelixConfig, boolean setPreferenceList, + int maxInstancesInOneResourceForFullAuto, boolean dryRun) { + try { + // 0. Check if all resources in the cluster are full-auto compatible + boolean resourcesAreFullAutoCompatible = + maybeVerifyResourcesAreFullAutoCompatible(dc, clusterName, admin, maxInstancesInOneResourceForFullAuto); + if (!resourcesAreFullAutoCompatible) { + throw new IllegalStateException("Resources are not full auto compatible"); + } + ClusterMapConfig config = getClusterMapConfig(clusterName, dc, null); + try (PropertyStoreToDataNodeConfigAdapter propertyStoreAdapter = new PropertyStoreToDataNodeConfigAdapter( + zkConnectStr, config)) { + // Get resources to migrate from user input. + Set helixResources = admin.getResourcesInCluster(clusterName) + .stream() + .filter(s -> s.matches("\\d+")) + .collect(Collectors.toSet()); + Set resources; + if (commaSeparatedResources.equalsIgnoreCase(ALL)) { + resources = helixResources; + } else { + resources = Arrays.stream(commaSeparatedResources.replaceAll("\\p{Space}", "").split(",")) .collect(Collectors.toSet()); - Set resources; - if (commaSeparatedResources.equalsIgnoreCase(ALL)) { - resources = helixResources; - } else { - resources = Arrays.stream(commaSeparatedResources.replaceAll("\\p{Space}", "").split(",")) - .collect(Collectors.toSet()); - resources.removeIf(resource -> { - if (!helixResources.contains(resource)) { - logger.info("Resource {} is not present in data center {}", resource, dcName); - return true; - } - return false; - }); - } + resources.removeIf(resource -> { + if (!helixResources.contains(resource)) { + logger.info("Resource {} is not present in data center {}", resource, dc); + return true; + } + return false; + }); + } - Map resourceToIdealState = new HashMap<>(); - for (String resource : resources) { - resourceToIdealState.put(resource, helixAdmin.getResourceIdealState(clusterName, resource)); - } + Map resourceToIdealState = new HashMap<>(); + for (String resource : resources) { + resourceToIdealState.put(resource, admin.getResourceIdealState(clusterName, resource)); + } - ConfigAccessor configAccessor = - new ConfigAccessor(dataCenterToZkAddress.get(dcName).getZkConnectStrs().get(0)); + ConfigAccessor configAccessor = new ConfigAccessor(zkConnectStr); - // 1. Update cluster config - setClusterConfig(configAccessor, wagedHelixConfig.getClusterConfigFields()); + // 1. Update cluster config + setClusterConfig(clusterName, configAccessor, wagedHelixConfig.getClusterConfigFields(), dryRun); - // 2. Update instance config for all instances present in each resource - for (String resource : resources) { - // a. Get list of instances sharing partitions in this resource - IdealState idealState = resourceToIdealState.get(resource); - Set partitions = idealState.getPartitionSet(); - Set instances = new HashSet<>(); - for (String partition : partitions) { - instances.addAll(idealState.getInstanceSet(partition)); - } - // b. Update instance config for each instance - for (String instance : instances) { - setInstanceConfig(dcName, instance, resource, configAccessor, propertyStoreAdapter); - } + // 2. Update instance config for all instances present in each resource + for (String resource : resources) { + // a. Get list of instances sharing partitions in this resource + IdealState idealState = resourceToIdealState.get(resource); + Set partitions = idealState.getPartitionSet(); + Set instances = new HashSet<>(); + for (String partition : partitions) { + instances.addAll(idealState.getInstanceSet(partition)); } - - // 3. Set resource configs - for (String resource : resources) { - setResourceConfig(dcName, resource, configAccessor, propertyStoreAdapter, helixAdmin, - wagedHelixConfig.clusterConfigFields); + // b. Update instance config for each instance + for (String instance : instances) { + setInstanceConfig(clusterName, instance, resource, configAccessor, propertyStoreAdapter, admin, dryRun); } + } - if (!dryRun) { - info("[{}] Migrating resources {} in cluster {} from semi-auto to full-auto", dcName.toUpperCase(), - resources, clusterName); - - // 3. Enter maintenance mode - helixAdmin.manuallyEnableMaintenanceMode(clusterName, true, "Migrating to Full auto", - Collections.emptyMap()); - info("[{}] Enabled maintenance mode for cluster {}", dcName.toUpperCase(), clusterName); - - // 4. Update ideal state and enable waged rebalancer - for (String resource : resources) { - IdealState idealState = resourceToIdealState.get(resource); - idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); - // Set resource tag - idealState.setInstanceGroupTag(getResourceTag(resource)); - // Set number of replicas - idealState.setReplicas(String.valueOf(wagedHelixConfig.getIdealStateConfigFields().getNumReplicas())); - // Set minimum active replicas needed. Helix will bring up a new replica if the replica count goes below - // this value even if we are in delayed rebalancer window. - idealState.setMinActiveReplicas(wagedHelixConfig.getIdealStateConfigFields().getMinActiveReplicas()); - helixAdmin.updateIdealState(clusterName, resource, idealState); - info( - "Updated ideal state for resource {}. Rebalance mode: {}, Instance group tag: {}, Number of replicas: {}, Minimum active replicas: {}", - resource, idealState.getRebalanceMode().toString(), idealState.getReplicas(), - idealState.getMinActiveReplicas()); - } - helixAdmin.enableWagedRebalance(clusterName, new ArrayList<>(resources)); - info("[{}] Enabled waged rebalancer for resources {} in cluster {}", dcName.toUpperCase(), resources, - clusterName); - - // 7. Update the list of resources migrating to full_auto in helix property store. This is used by servers - // in case we want to fall back to semi_auto later. - updateAdminConfigForFullAutoMigration(dcName, resources); + // 3. Set resource configs + for (String resource : resources) { + setResourceConfig(clusterName, resource, configAccessor, propertyStoreAdapter, admin, + wagedHelixConfig.clusterConfigFields, setPreferenceList, dryRun); + } - // 8. Exit maintenance mode - helixAdmin.manuallyEnableMaintenanceMode(clusterName, false, "Complete migrating to Full auto", - Collections.emptyMap()); - info("[{}] Disabled maintenance mode for cluster {}", dcName.toUpperCase(), clusterName); + if (!dryRun) { + // 4. Enter maintenance mode + admin.manuallyEnableMaintenanceMode(clusterName, true, "Migrating to Full auto", Collections.emptyMap()); + info("[{}] Enabled maintenance mode for cluster {}", dc.toUpperCase(), clusterName); + } - info("[{}] Successfully migrated resources {} in cluster {} from semi-auto to full-auto", - dcName.toUpperCase(), resources, clusterName); + // 5. Update ideal state and enable waged rebalancer + for (String resource : resources) { + IdealState idealState = resourceToIdealState.get(resource); + idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + // Set resource tag + idealState.setInstanceGroupTag(getResourceTag(resource)); + // Set number of replicas + idealState.setReplicas(String.valueOf(wagedHelixConfig.getIdealStateConfigFields().getNumReplicas())); + // Set minimum active replicas needed. Helix will bring up a new replica if the replica count goes below + // this value even if we are in delayed rebalancer window. + idealState.setMinActiveReplicas(wagedHelixConfig.getIdealStateConfigFields().getMinActiveReplicas()); + if (!dryRun) { + admin.updateIdealState(clusterName, resource, idealState); + info("Updated ideal state for resource {}. REBALANCE_MODE: {}, INSTANCE_GROUP_TAG: {}, NUM_PARTITIONS: {}, " + + "MIN_ACTIVE_REPLICAS: {}", resource, idealState.getRebalanceMode().toString(), + idealState.getInstanceGroupTag(), idealState.getReplicas(), idealState.getMinActiveReplicas()); } else { - info("[{}] DryRun. Updated cluster, instance and instance configs for resources in cluster. " - + "Ideal state is not updated to full-auto", dcName.toUpperCase(), resources, clusterName); + info("Dry run. This will update ideal state for resource {}. REBALANCE_MODE: {}, INSTANCE_GROUP_TAG: {}, " + + "NUM_PARTITIONS: {}, MIN_ACTIVE_REPLICAS: {}", resource, idealState.getRebalanceMode().toString(), + idealState.getInstanceGroupTag(), idealState.getReplicas(), idealState.getMinActiveReplicas()); } } - } catch (Throwable t) { - logger.error("Error while migrating resources to full-auto in {}", dcName, t); - } finally { - migrationComplete.countDown(); - } - }, false).start()); - migrationComplete.await(); + if (!dryRun) { + admin.enableWagedRebalance(clusterName, new ArrayList<>(resources)); + info("[{}] Enabled waged rebalancer for resources {} in cluster {}", dc.toUpperCase(), resources, + clusterName); + // 6. Update the list of resources migrating to full_auto in helix property store. This is used by servers + // in case we want to fall back to semi_auto later. + updateAdminConfigForFullAutoMigration(clusterName, dc, resources, zkConnectStr); + // 7. Exit maintenance mode + admin.manuallyEnableMaintenanceMode(clusterName, false, "Complete migrating to Full auto", + Collections.emptyMap()); + info("[{}] Disabled maintenance mode for cluster {}", dc.toUpperCase(), clusterName); + info("[{}] Successfully migrated resources {} in cluster {} from semi-auto to full-auto", dc.toUpperCase(), + resources, clusterName); + } + } + } catch (Throwable t) { + logger.error("Error while migrating resources to full-auto in {}", dc, t); + } } /** * Add admin config in Helix to keep track of list of resources migrating to Full-Auto - * @param dcName data center name - * @param resources migrating to Full Auto - */ - private void updateAdminConfigForFullAutoMigration(String dcName, Set resources) { - HelixPropertyStore helixPropertyStore = createHelixPropertyStore(dcName); + * + * @param clusterName + * @param dcName data center name + * @param resources migrating to Full Auto + * @param zkConnectStr + */ + private static void updateAdminConfigForFullAutoMigration(String clusterName, String dcName, Set resources, + String zkConnectStr) { + HelixPropertyStore helixPropertyStore = createHelixPropertyStore(clusterName, zkConnectStr); try { // Get property store ZNode path ZNRecord zNRecord = helixPropertyStore.get(FULL_AUTO_MIGRATION_ZNODE_PATH, null, AccessOption.PERSISTENT); @@ -1010,16 +1107,19 @@ private void updateAdminConfigForFullAutoMigration(String dcName, Set re * @param resource resource name * @return tag name */ - private String getResourceTag(String resource) { + private static String getResourceTag(String resource) { return "TAG_" + resource; } /** * Sets up cluster to use waged rebalancer - * @param configAccessor the {@link ConfigAccessor} to access configuration in Helix. + * @param clusterName the cluster name + * @param configAccessor the {@link ConfigAccessor} to access configuration in Helix. * @param clusterConfigFields disk capacity of each partition. + * @param dryRun if true, perform a dry run; do not change ideal state to Full-Auto in Helix. */ - private void setClusterConfig(ConfigAccessor configAccessor, ClusterConfigFields clusterConfigFields) { + private static void setClusterConfig(String clusterName, ConfigAccessor configAccessor, + ClusterConfigFields clusterConfigFields, boolean dryRun) { ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); // 1. Set topology awareness. For example, if fault_zone_type is rack, helix avoids placing replicas on the hosts @@ -1028,10 +1128,10 @@ private void setClusterConfig(ConfigAccessor configAccessor, ClusterConfigFields clusterConfig.setTopology(TOPOLOGY); clusterConfig.setFaultZoneType(FAULT_ZONE_TYPE); - // 2. Set max number of concurrent state transitions. This avoids too many replica movements in the cluster. + // 2. Set max number of concurrent state transitions at resource level. This avoids too many replica movements in the cluster. StateTransitionThrottleConfig stateTransitionThrottleConfig = new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, - StateTransitionThrottleConfig.ThrottleScope.INSTANCE, clusterConfigFields.maxPartitionInTransition); + StateTransitionThrottleConfig.ThrottleScope.RESOURCE, clusterConfigFields.maxPartitionInTransition); clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(stateTransitionThrottleConfig)); // 3. Set delay re-balancing to true to avoid replica movements if host is temporarily down due to deployment, @@ -1039,7 +1139,7 @@ private void setClusterConfig(ConfigAccessor configAccessor, ClusterConfigFields clusterConfig.setDelayRebalaceEnabled(true); clusterConfig.setRebalanceDelayTime(clusterConfigFields.delayRebalanceTimeInMs); - // 4. Set persistence of best possible assignment. This will allow us to see the partition assignment in IDEAL state. + // 4. Set persistence of the best possible assignment. This will allow us to see the partition assignment in IDEAL state. clusterConfig.setPersistBestPossibleAssignment(true); // 5. Set default weight for each partition. Helix will do partition assignment based on partition weight and host @@ -1062,28 +1162,42 @@ private void setClusterConfig(ConfigAccessor configAccessor, ClusterConfigFields clusterConfig.setGlobalRebalancePreference(globalRebalancePreferenceKeyIntegerMap); // Update cluster config in Helix/ZK - configAccessor.setClusterConfig(clusterName, clusterConfig); - info( - "Updated cluster config fields for Full-Auto waged rebalancer. Topology: {}, Fault_zone_type: {}, Max_partition_in_transition: {}, " - + "Delay_rebalance_time_in_secs: {}, Partition_disk_weight_in_gb: {}, Evenness: {}, Less_movement: {} ", - TOPOLOGY, FAULT_ZONE_TYPE, clusterConfigFields.maxPartitionInTransition, - clusterConfigFields.delayRebalanceTimeInMs, clusterConfigFields.partitionDiskWeightInGB, - clusterConfigFields.evenness, clusterConfigFields.lessMovement); + if (!dryRun) { + configAccessor.setClusterConfig(clusterName, clusterConfig); + info("Updated cluster config fields for Full-Auto waged rebalancer. TOPOLOGY: {}, FAULT_ZONE_TYPE: {}, " + + "MAX_PARTITION_IN_TRANSITION: {}, DELAY_REBALANCE_TIME (ms): {}, DEFAULT_PARTITION_WEIGHT_MAP_DISK: {}, " + + "EVENNESS: {}, LESS_MOVEMENT: {} ", TOPOLOGY, FAULT_ZONE_TYPE, clusterConfigFields.maxPartitionInTransition, + clusterConfigFields.delayRebalanceTimeInMs, clusterConfigFields.partitionDiskWeightInGB, + clusterConfigFields.evenness, clusterConfigFields.lessMovement); + } else { + info("Dry run. This will update cluster config fields for Full-Auto waged rebalancer with values as, " + + "TOPOLOGY: {}, FAULT_ZONE_TYPE: {}, MAX_PARTITION_IN_TRANSITION: {}, " + + "DELAY_REBALANCE_TIME (ms): {}, DEFAULT_PARTITION_WEIGHT_MAP_DISK: {}, EVENNESS: {}, LESS_MOVEMENT: {} ", + TOPOLOGY, FAULT_ZONE_TYPE, clusterConfigFields.maxPartitionInTransition, + clusterConfigFields.delayRebalanceTimeInMs, clusterConfigFields.partitionDiskWeightInGB, + clusterConfigFields.evenness, clusterConfigFields.lessMovement); + } } /** * Sets up instance to use waged rebalancer - * @param dcName data center name - * @param instanceName instance name - * @param resource resource name - * @param configAccessor the {@link ConfigAccessor} to access configuration in Helix. - * @param propertyStoreToDataNodeConfigAdapter {@link PropertyStoreToDataNodeConfigAdapter} to access property store data. - */ - private void setInstanceConfig(String dcName, String instanceName, String resource, ConfigAccessor configAccessor, - PropertyStoreToDataNodeConfigAdapter propertyStoreToDataNodeConfigAdapter) { + * + * @param clusterName + * @param instanceName instance name + * @param resource resource name + * @param configAccessor the {@link ConfigAccessor} to access configuration in Helix. + * @param propertyStoreToDataNodeConfigAdapter {@link PropertyStoreToDataNodeConfigAdapter} to access property store + * data. + * @param helixAdmin {@link HelixAdmin} + * @param dryRun if true, perform a dry run; do not change ideal state to Full-Auto in Helix. + */ + private static void setInstanceConfig(String clusterName, String instanceName, String resource, + ConfigAccessor configAccessor, PropertyStoreToDataNodeConfigAdapter propertyStoreToDataNodeConfigAdapter, + HelixAdmin helixAdmin, boolean dryRun) { InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName); DataNodeConfig nodeConfigFromHelix = - getDataNodeConfigFromHelix(dcName, instanceName, propertyStoreToDataNodeConfigAdapter, null); + getDataNodeConfigFromHelix(clusterName, instanceName, propertyStoreToDataNodeConfigAdapter, null, helixAdmin, + PROPERTY_STORE); // 1. Add tag // Remove previous tags @@ -1113,23 +1227,36 @@ private void setInstanceConfig(String dcName, String instanceName, String resour instanceConfig.setInstanceCapacityMap(capacityMap); // Update instance config in Helix/ZK - configAccessor.updateInstanceConfig(clusterName, instanceName, instanceConfig); - info("Updated instance config fields for Instance {}. Tags: {}, Domain: {}, Capacity map {}", instanceName, - instanceConfig.getTags(), instanceConfig.getDomainAsString(), instanceConfig.getInstanceCapacityMap()); + if (!dryRun) { + configAccessor.updateInstanceConfig(clusterName, instanceName, instanceConfig); + info("Updated instance config fields for Instance {}. TAGS: {}, DOMAIN: {}, INSTANCE_CAPACITY_MAP {}", + instanceName, instanceConfig.getTags(), instanceConfig.getDomainAsString(), + instanceConfig.getInstanceCapacityMap()); + } else { + info( + "Dry run. This will update instance config fields for Instance {}. TAGS: {}, DOMAIN: {}, INSTANCE_CAPACITY_MAP {}", + instanceName, instanceConfig.getTags(), instanceConfig.getDomainAsString(), + instanceConfig.getInstanceCapacityMap()); + } } /** * Set resource configs for waged rebalancer - * @param dcName data center - * @param resourceName resource - * @param configAccessor the {@link ConfigAccessor} to access configuration in Helix. - * @param propertyStoreToDataNodeConfigAdapter {@link PropertyStoreToDataNodeConfigAdapter} to access property store data. - * @param helixAdmin {@link HelixAdmin} - * @param clusterConfigFields inputted cluster configuration - */ - private void setResourceConfig(String dcName, String resourceName, ConfigAccessor configAccessor, + * + * @param clusterName + * @param resourceName resource + * @param configAccessor the {@link ConfigAccessor} to access configuration in Helix. + * @param propertyStoreToDataNodeConfigAdapter {@link PropertyStoreToDataNodeConfigAdapter} to access property store + * data. + * @param helixAdmin {@link HelixAdmin} + * @param clusterConfigFields inputted cluster configuration + * @param setPreferenceList if {@code true}, sets semi-auto placement as preference list for + * full-auto + * @param dryRun if true, perform a dry run; do not change ideal state to Full-Auto in Helix. + */ + private static void setResourceConfig(String clusterName, String resourceName, ConfigAccessor configAccessor, PropertyStoreToDataNodeConfigAdapter propertyStoreToDataNodeConfigAdapter, HelixAdmin helixAdmin, - ClusterConfigFields clusterConfigFields) throws IOException { + ClusterConfigFields clusterConfigFields, boolean setPreferenceList, boolean dryRun) throws IOException { ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, resourceName); if (resourceConfig == null) { @@ -1152,7 +1279,8 @@ private void setResourceConfig(String dcName, String resourceName, ConfigAccesso // if they are different from default values for (String instanceName : instances) { DataNodeConfig dataNodeConfig = - getDataNodeConfigFromHelix(dcName, instanceName, propertyStoreToDataNodeConfigAdapter, null); + getDataNodeConfigFromHelix(clusterName, instanceName, propertyStoreToDataNodeConfigAdapter, null, helixAdmin, + PROPERTY_STORE); // Get Disk configs on the node Map diskConfigs = dataNodeConfig.getDiskConfigs(); for (DataNodeConfig.DiskConfig diskConfig : diskConfigs.values()) { @@ -1171,15 +1299,30 @@ private void setResourceConfig(String dcName, String resourceName, ConfigAccesso } partitionCapacityMap.put(DEFAULT_PARTITION_KEY, Collections.singletonMap(DISK_KEY, clusterConfigFields.partitionDiskWeightInGB + PARTITION_BUFFER_CAPACITY_FOR_INDEX_FILES_IN_GB)); + resourceConfig.setPartitionCapacityMap(partitionCapacityMap); - // 3. Update resource configs in helix + // 3. Add a simple config with value of default partition capacity. Servers use this value to know the + // partition capacity during boot up final long GB = 1024 * 1024 * 1024; - resourceConfig.setPartitionCapacityMap(partitionCapacityMap); resourceConfig.putSimpleConfig(DEFAULT_REPLICA_CAPACITY_STR, String.valueOf(GB * clusterConfigFields.partitionDiskWeightInGB)); - configAccessor.setResourceConfig(clusterName, resourceName, resourceConfig); - info("Updated resource config/partition weights for resource {}. Partition weights: {}", resourceName, - resourceConfig.getPartitionCapacityMap()); + + // 4. Set semi-auto placement as preference list so that no replicas are moved. + if (setPreferenceList) { + Map> semiAutoPreferenceLists = idealState.getPreferenceLists(); + resourceConfig.setPreferenceLists(semiAutoPreferenceLists); + } + + // 5. Update resource configs in helix + if (!dryRun) { + configAccessor.setResourceConfig(clusterName, resourceName, resourceConfig); + info("Updated resource config/partition weights for resource {}. PARTITION_CAPACITY_MAP: {}, PREFERENCE_LIST: {}", + resourceName, resourceConfig.getPartitionCapacityMap(), resourceConfig.getPreferenceLists()); + } else { + info( + "Dry run. This will update resource config/partition weights for resource {}. PARTITION_CAPACITY_MAP: {}, PREFERENCE_LIST: {}", + resourceName, resourceConfig.getPartitionCapacityMap(), resourceConfig.getPreferenceLists()); + } } /** @@ -1324,7 +1467,9 @@ private void updateClusterMapInHelix(boolean startValidatingClusterManager) thro if (adminForDc.containsKey(dc.getName())) { newThread(() -> { info("\n=======Starting datacenter: {}=========\n", dc.getName()); - boolean resourcesAreFullAutoCompatible = maybeVerifyResourcesAreFullAutoCompatible(dc.getName(), clusterName); + boolean resourcesAreFullAutoCompatible = + maybeVerifyResourcesAreFullAutoCompatible(dc.getName(), clusterName, adminForDc.get(dc.getName()), + this.maxInstancesInOneResourceForFullAuto); if (resourcesAreFullAutoCompatible) { verifyPartitionPlacementIsFullAutoCompatibleInStatic(dc.getName()); } @@ -1705,7 +1850,8 @@ private void addUpdateInstances(String dcName, Map> partitio int totalInstances = instancesInBoth.size() + instancesInHelix.size() + instancesInStatic.size(); for (String instanceName : instancesInBoth) { DataNodeConfig nodeConfigFromHelix = - getDataNodeConfigFromHelix(dcName, instanceName, propertyStoreAdapter, instanceConfigConverter); + getDataNodeConfigFromHelix(clusterName, instanceName, propertyStoreAdapter, instanceConfigConverter, + adminForDc.get(dcName), dataNodeConfigSourceType); DataNodeConfig nodeConfigFromStatic = createDataNodeConfigFromStatic(dcName, instanceName, nodeConfigFromHelix, partitionsToInstancesInDc, instanceConfigConverter); @@ -1803,13 +1949,14 @@ private DataNodeConfig createDataNodeConfigFromStatic(String dcName, String inst partitionsToInstancesInDc, instanceToDiskReplicasMap, referenceInstanceConfig)); } - private DataNodeConfig getDataNodeConfigFromHelix(String dcName, String instanceName, - PropertyStoreToDataNodeConfigAdapter adapter, InstanceConfigToDataNodeConfigAdapter.Converter converter) { + private static DataNodeConfig getDataNodeConfigFromHelix(String clusterName, String instanceName, + PropertyStoreToDataNodeConfigAdapter adapter, InstanceConfigToDataNodeConfigAdapter.Converter converter, + HelixAdmin helixAdmin, DataNodeConfigSourceType dataNodeConfigSourceType) { DataNodeConfig dataNodeConfig; if (dataNodeConfigSourceType == PROPERTY_STORE) { dataNodeConfig = adapter.get(instanceName); } else { - dataNodeConfig = converter.convert(adminForDc.get(dcName).getInstanceConfig(clusterName, instanceName)); + dataNodeConfig = converter.convert(helixAdmin.getInstanceConfig(clusterName, instanceName)); } return dataNodeConfig; } @@ -1869,7 +2016,8 @@ private void addUpdateResources(String dcName, Map> partitio List resourcesInCluster = dcAdmin.getResourcesInCluster(clusterName); List instancesWithDisabledPartition = new ArrayList<>(); HelixPropertyStore helixPropertyStore = - helixAdminOperation == HelixAdminOperation.DisablePartition ? createHelixPropertyStore(dcName) : null; + helixAdminOperation == HelixAdminOperation.DisablePartition ? createHelixPropertyStore(clusterName, + dataCenterToZkAddress.get(dcName).getZkConnectStrs().get(0)) : null; TreeMap> resourceIdToInstances = new TreeMap<>(); Map> instanceNameToResources = new HashMap<>(); Map resourceIdToIdealState = new HashMap<>(); @@ -2055,16 +2203,17 @@ private void addUpdateResources(String dcName, Map> partitio /** * Create a {@link HelixPropertyStore} for given datacenter. - * @param dcName the name of datacenter + * + * @param clusterName the name of datacenter + * @param zkConnectString * @return {@link HelixPropertyStore} associated with given dc. */ - private HelixPropertyStore createHelixPropertyStore(String dcName) { + private static HelixPropertyStore createHelixPropertyStore(String clusterName, String zkConnectString) { Properties storeProps = new Properties(); storeProps.setProperty("helix.property.store.root.path", "/" + clusterName + "/" + PROPERTYSTORE_STR); HelixPropertyStoreConfig propertyStoreConfig = new HelixPropertyStoreConfig(new VerifiableProperties(storeProps)); // The number of zk endpoints has been validated in the ctor of HelixBootstrapUpgradeUtil, no need to check it again - String zkConnectStr = dataCenterToZkAddress.get(dcName).getZkConnectStrs().get(0); - return CommonUtils.createHelixPropertyStore(zkConnectStr, propertyStoreConfig, null); + return CommonUtils.createHelixPropertyStore(zkConnectString, propertyStoreConfig, null); } /** @@ -2469,7 +2618,8 @@ private void getSealedPartitionsInDc(Datacenter dc, Map> dcT String instanceName = getInstanceName(dataNode); ensureOrThrow(allInstancesInHelix.contains(instanceName), "Instance not present in Helix " + instanceName); DataNodeConfig dataNodeConfig = - getDataNodeConfigFromHelix(dcName, instanceName, propertyStoreAdapter, instanceConfigConverter); + getDataNodeConfigFromHelix(clusterName, instanceName, propertyStoreAdapter, instanceConfigConverter, + adminForDc.get(dcName), dataNodeConfigSourceType); Set sealedReplicas = dataNodeConfig.getSealedReplicas(); if (sealedReplicas != null) { for (String sealedReplica : sealedReplicas) { @@ -2509,7 +2659,8 @@ private void verifyEquivalencyWithStaticClusterMap(HardwareLayout hardwareLayout try { verifyResourcesAndPartitionEquivalencyInDc(dc, clusterName, partitionLayout); verifyDataNodeAndDiskEquivalencyInDc(dc, clusterName, partitionLayout); - maybeVerifyResourcesAreFullAutoCompatible(dc.getName(), clusterName); + maybeVerifyResourcesAreFullAutoCompatible(dc.getName(), clusterName, admin, + this.maxInstancesInOneResourceForFullAuto); } catch (Throwable t) { logger.error("[{}] error message: {}", dc.getName().toUpperCase(), t.getMessage()); errorCount.getAndIncrement(); @@ -2550,8 +2701,8 @@ private void verifyDataNodeAndDiskEquivalencyInDc(Datacenter dc, String clusterN String instanceName = getInstanceName(dataNode); ensureOrThrow(allInstancesInHelix.remove(instanceName), "Instance not present in Helix " + instanceName); DataNodeConfig dataNodeConfig = - getDataNodeConfigFromHelix(dcName, instanceName, propertyStoreAdapter, instanceConfigConverter); - + getDataNodeConfigFromHelix(clusterName, instanceName, propertyStoreAdapter, instanceConfigConverter, + adminForDc.get(dcName), dataNodeConfigSourceType); Map diskInfos = new HashMap<>(dataNodeConfig.getDiskConfigs()); for (Disk disk : dataNode.getDisks()) { DataNodeConfig.DiskConfig diskInfoInHelix = diskInfos.remove(disk.getMountPath()); @@ -2626,14 +2777,18 @@ private void verifyDataNodeAndDiskEquivalencyInDc(Datacenter dc, String clusterN } /** - * If the resources are reconstructed to be FULL_AUTO compatible, then make sure that each instance only has partitions - * from one resource. - * @param dcName the datacenter whose information is to be verified. - * @param clusterName the cluster to be verified. - */ - private boolean maybeVerifyResourcesAreFullAutoCompatible(String dcName, String clusterName) { - HelixAdmin admin = adminForDc.get(dcName); - List resourceNames = admin.getResourcesInCluster(clusterName); + * If the resources are reconstructed to be FULL_AUTO compatible, then make sure that each instance only has + * partitions from one resource. + * + * @param dcName the datacenter whose information is to be verified. + * @param clusterName the cluster to be verified. + * @param helixAdmin helix admin + * @param maxInstancesInOneResourceForFullAuto max number of instances to be assigned under one resource when the + * resources are in full auto compatible mode + */ + private static boolean maybeVerifyResourcesAreFullAutoCompatible(String dcName, String clusterName, + HelixAdmin helixAdmin, int maxInstancesInOneResourceForFullAuto) { + List resourceNames = helixAdmin.getResourcesInCluster(clusterName); if (resourceNames == null || resourceNames.isEmpty()) { info( "[{}] There is no resource found for this cluster {}, max instance in one resource is {}, resource should {} be full auto compatible", @@ -2641,9 +2796,8 @@ private boolean maybeVerifyResourcesAreFullAutoCompatible(String dcName, String maxInstancesInOneResourceForFullAuto > 0 ? "" : "NOT"); return maxInstancesInOneResourceForFullAuto > 0; } - boolean allResourceFullAutoCompatible = resourceNames.stream() - .filter(rn -> rn.matches("\\d+")) - .mapToInt(Integer::parseInt) + boolean allResourceFullAutoCompatible = + resourceNames.stream().filter(rn -> rn.matches("\\d+")).mapToInt(Integer::parseInt) .allMatch(i -> i >= FULL_AUTO_COMPATIBLE_RESOURCE_NAME_START_NUMBER); boolean allResourceNotFullAutoCompatible = resourceNames.stream() .filter(rn -> rn.matches("\\d+")) @@ -2666,14 +2820,14 @@ private boolean maybeVerifyResourcesAreFullAutoCompatible(String dcName, String } SortedMap> resourceNameToInstances = new TreeMap<>(); Map> instanceNameToResources = new HashMap<>(); - for (String resourceName : admin.getResourcesInCluster(clusterName)) { + for (String resourceName : helixAdmin.getResourcesInCluster(clusterName)) { if (!resourceName.matches("\\d+")) { info("[{}] Ignoring resource {} as it is not part of the cluster map", dcName.toUpperCase(), resourceName); continue; } Integer resourceId = Integer.parseInt(resourceName); resourceNameToInstances.put(resourceId, new HashSet<>()); - IdealState resourceIS = admin.getResourceIdealState(clusterName, resourceName); + IdealState resourceIS = helixAdmin.getResourceIdealState(clusterName, resourceName); Set resourcePartitions = resourceIS.getPartitionSet(); for (String resourcePartition : resourcePartitions) { Set partitionInstanceSet = resourceIS.getInstanceSet(resourcePartition); @@ -2805,7 +2959,7 @@ private static Map> getMountPathToReplicas(Static * @param condition the boolean condition to check. * @param errStr the error message to associate with the assertion error. */ - private void ensureOrThrow(boolean condition, String errStr) { + static void ensureOrThrow(boolean condition, String errStr) { if (!condition) { throw new AssertionError(errStr); } diff --git a/ambry-tools/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeTool.java b/ambry-tools/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeTool.java index 612401a4f9..df4666d40b 100644 --- a/ambry-tools/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeTool.java +++ b/ambry-tools/src/main/java/com/github/ambry/clustermap/HelixBootstrapUpgradeTool.java @@ -98,6 +98,16 @@ public static void main(String[] args) throws Exception { "Drops the given Ambry cluster from Helix. Use this option with care. If present, must be accompanied with and " + "only with the clusterName argument"); + OptionSpec migrateToFullAutoOpt = + parser.accepts("migrateToFullAuto", "Migrates the given Ambry cluster from Semi-Auto to Full-Auto"); + + OptionSpec rollbackToSemiAutoOpt = + parser.accepts("rollbackToSemiAuto", "Rolls back the given Ambry cluster from Full-Auto to Semi-Auto"); + + OptionSpec updatePreferenceListsOpt = parser.accepts("updatePreferenceLists", + "Removes partitions from preference list in resource configs. This option must be used when " + + "cluster is in Full Auto and must be accompanied with resource name and list of partitions"); + OptionSpec forceRemove = parser.accepts("forceRemove", "Specifies that any instances or partitions absent in the json files be removed from Helix. Use this with care"); @@ -106,25 +116,25 @@ public static void main(String[] args) throws Exception { ArgumentAcceptingOptionSpec hardwareLayoutPathOpt = parser.accepts("hardwareLayoutPath", "The path to the hardware layout json file") - .requiredUnless(dropClusterOpt) + .requiredUnless(dropClusterOpt, updatePreferenceListsOpt, migrateToFullAutoOpt, rollbackToSemiAutoOpt) .withRequiredArg() .describedAs("hardware_layout_path") .ofType(String.class); ArgumentAcceptingOptionSpec partitionLayoutPathOpt = parser.accepts("partitionLayoutPath", "The path to the partition layout json file") - .requiredUnless(dropClusterOpt) + .requiredUnless(dropClusterOpt, updatePreferenceListsOpt, migrateToFullAutoOpt, rollbackToSemiAutoOpt) .withRequiredArg() .describedAs("partition_layout_path") .ofType(String.class); ArgumentAcceptingOptionSpec zkLayoutPathOpt = parser.accepts("zkLayoutPath", - "The path to the json file containing zookeeper connect info. This should be of the following form: \n{\n" - + " \"zkInfo\" : [\n" + " {\n" + " \"datacenter\":\"dc1\",\n" - + " \"zkConnectStr\":\"abc.example.com:2199\",\n" + " },\n" + " {\n" - + " \"datacenter\":\"dc2\",\n" + " \"zkConnectStr\":\"def.example.com:2300\",\n" + " },\n" - + " {\n" + " \"datacenter\":\"dc3\",\n" + " \"zkConnectStr\":\"ghi.example.com:2400\",\n" - + " }\n" + " ]\n" + "}") + "The path to the json file containing zookeeper connect info. This should be of the following form: \n{\n" + + " \"zkInfo\" : [\n" + " {\n" + " \"datacenter\":\"dc1\",\n" + + " \"zkConnectStr\":\"abc.example.com:2199\",\n" + " },\n" + " {\n" + + " \"datacenter\":\"dc2\",\n" + " \"zkConnectStr\":\"def.example.com:2300\",\n" + " },\n" + + " {\n" + " \"datacenter\":\"dc3\",\n" + " \"zkConnectStr\":\"ghi.example.com:2400\",\n" + + " }\n" + " ]\n" + "}") .requiredUnless(dropClusterOpt) .withRequiredArg() .describedAs("zk_connect_info_path") @@ -136,29 +146,28 @@ public static void main(String[] args) throws Exception { .describedAs("cluster_name_prefix") .ofType(String.class); - ArgumentAcceptingOptionSpec clusterNameOpt = - parser.accepts("clusterName", "The cluster in Helix to drop. This should accompany the dropCluster option") - .requiredIf(dropClusterOpt) - .withRequiredArg() - .describedAs("cluster_name") - .ofType(String.class); + ArgumentAcceptingOptionSpec clusterNameOpt = parser.accepts("clusterName", "The cluster in Helix") + .requiredIf(dropClusterOpt, migrateToFullAutoOpt, updatePreferenceListsOpt, rollbackToSemiAutoOpt) + .withRequiredArg() + .describedAs("cluster_name") + .ofType(String.class); ArgumentAcceptingOptionSpec dcsNameOpt = parser.accepts("dcs", - "The comma-separated datacenters (colos) to update. Use '--dcs all' if updates to every datacenter is intended") + "The comma-separated datacenters (colos) to update. Use '--dcs all' if updates to every datacenter is intended") .withRequiredArg() .describedAs("datacenters") .required() .ofType(String.class); ArgumentAcceptingOptionSpec maxPartitionsInOneResourceOpt = parser.accepts("maxPartitionsInOneResource", - "(Optional argument) The maximum number of partitions that should be grouped under a Helix resource. " - + "If the resources are reconstructed to be FULL_AUTO compatible, then this option would be ignored") + "(Optional argument) The maximum number of partitions that should be grouped under a Helix resource. " + + "If the resources are reconstructed to be FULL_AUTO compatible, then this option would be ignored") .withRequiredArg() .describedAs("max_partitions_in_one_resource") .ofType(String.class); ArgumentAcceptingOptionSpec stateModelDefinitionOpt = parser.accepts("stateModelDef", - "(Optional argument) The state model definition that should be created in cluster if doesn't exist") + "(Optional argument) The state model definition that should be created in cluster if doesn't exist") .withRequiredArg() .describedAs("state_model_definition") .ofType(String.class); @@ -181,7 +190,6 @@ public static void main(String[] args) throws Exception { + " '--adminOperation ListSealedPartition' # List all sealed partitions in Helix cluster (aggregated across all datacenters)" + " '--adminOperation ValidateCluster' # Validates the information in static clustermap is consistent with the information in Helix" + " '--adminOperation MigrateToPropertyStore' # Migrate custom instance config properties to DataNodeConfigs in the property store" - + " '--adminOperation MigrateToFullAuto' # Migrate resources to Full Auto" + " '--adminOperation BootstrapCluster' # (Default operation if not specified) Bootstrap cluster based on static clustermap") .withRequiredArg() .describedAs("admin_operation") @@ -234,24 +242,46 @@ public static void main(String[] args) throws Exception { ArgumentAcceptingOptionSpec maxInstancesInOneResourceForFullAutoOpt = parser.accepts("maxInstancesInOneResourceForFullAuto", - "Maximum number of instance in a resource when the resources are constructed as FULL_AUTO compatible " - + "or the cluster is empty without any resources.\n" - + "This is only required when bootstrapping a cluster or update ideal state, or validating a cluster\n\n" - + "When the cluster is empty, if you provide 0 to this option, this tool would create resources in the old way, not full auto compatible way") + "Maximum number of instance in a resource when the resources are constructed as FULL_AUTO compatible " + + "or the cluster is empty without any resources.\n" + + "This is only required when bootstrapping a cluster or update ideal state, or validating a cluster\n\n" + + "When the cluster is empty, if you provide 0 to this option, this tool would create resources in the old way, not full auto compatible way") .withRequiredArg() .describedAs("max_instances_in_one_resource_for_full_auto") .ofType(Integer.class); - ArgumentAcceptingOptionSpec resourcesNameOpt = parser.accepts("resources", - "The comma-separated resources to migrate to Full Auto. Use '--resources all' to migrate all resources") + ArgumentAcceptingOptionSpec resourceNamesOpt = parser.accepts("resources", + "The comma-separated resources to migrate to Full-Auto or rollback to Semi-Auto. Use " + + "'--resources all' to migrate or rollback all resources") + .requiredIf(migrateToFullAutoOpt, rollbackToSemiAutoOpt) .withRequiredArg() .describedAs("resources") .ofType(String.class); - ArgumentAcceptingOptionSpec wagedConfigFilePathOpt = parser.accepts("wagedConfigFilePathOpt", - "The path to the waged config file path") + ArgumentAcceptingOptionSpec wagedConfigFilePathOpt = + parser.accepts("wagedConfigFilePathOpt", "The path to the waged config file path") + .withRequiredArg() + .describedAs("waged_config_file_path") + .ofType(String.class); + + OptionSpecBuilder setPreferenceList = parser.accepts("setPreferenceList", + "(Optional argument) Set current replica placement as " + + "preference list in resource config. This is used during Semi-Auto to Full-Auto migration to control " + + "replica movement. When we set this, helix doesn't move replicas for partitions in the preference list. " + + "We can later remove partitions from this list by using --updatePreferenceListsOpt option"); + + ArgumentAcceptingOptionSpec resourceNameOpt = parser.accepts("resourceName", + "resource name for updating preference list. This should accompany the --updatePreferenceLists option") + .requiredIf(updatePreferenceListsOpt) + .withRequiredArg() + .describedAs("resource_name") + .ofType(String.class); + + ArgumentAcceptingOptionSpec partitionsOpt = parser.accepts("partitions", + "The comma-separated partitions to be removed from preference list. This should accompany the updatePreferenceLists option") + .requiredIf(updatePreferenceListsOpt) .withRequiredArg() - .describedAs("waged_config_file_path") + .describedAs("partitions") .ofType(String.class); OptionSet options = parser.parse(args); @@ -269,6 +299,8 @@ public static void main(String[] args) throws Exception { String portStr = options.valueOf(portOpt); Integer maxInstancesInOneResourceForFullAuto = options.valueOf(maxInstancesInOneResourceForFullAutoOpt); String wagedConfigFilePath = options.valueOf(wagedConfigFilePathOpt); + String resourceName = options.valueOf(resourceNameOpt); + String partitions = options.valueOf(partitionsOpt); int maxPartitionsInOneResource = options.valueOf(maxPartitionsInOneResourceOpt) == null ? DEFAULT_MAX_PARTITIONS_PER_RESOURCE : Integer.parseInt(options.valueOf(maxPartitionsInOneResourceOpt)); @@ -277,7 +309,7 @@ public static void main(String[] args) throws Exception { DataNodeConfigSourceType dataNodeConfigSourceType = options.valueOf(dataNodeConfigSourceOpt) == null ? DataNodeConfigSourceType.PROPERTY_STORE : DataNodeConfigSourceType.valueOf(options.valueOf(dataNodeConfigSourceOpt)); - String resources = options.valueOf(resourcesNameOpt) == null ? "all" : options.valueOf(resourcesNameOpt); + String resources = options.valueOf(resourceNamesOpt) == null ? "all" : options.valueOf(resourceNamesOpt); ArrayList listOpt = new ArrayList<>(); listOpt.add(hardwareLayoutPathOpt); listOpt.add(partitionLayoutPathOpt); @@ -288,6 +320,24 @@ public static void main(String[] args) throws Exception { List> expectedOpts = Arrays.asList(dropClusterOpt, clusterNameOpt, zkLayoutPathOpt, dcsNameOpt); ToolUtils.ensureExactOrExit(expectedOpts, options.specs(), parser); HelixBootstrapUpgradeUtil.dropCluster(zkLayoutPath, clusterName, dcs, new HelixAdminFactory()); + } else if (options.has(migrateToFullAutoOpt)) { + List expectedOpts = + Arrays.asList(clusterNameOpt, dcsNameOpt, zkLayoutPathOpt, resourceNamesOpt, wagedConfigFilePathOpt, + maxInstancesInOneResourceForFullAutoOpt); + ToolUtils.ensureOrExit(expectedOpts, options, parser); + HelixBootstrapUpgradeUtil.migrateToFullAuto(clusterName, dcs, zkLayoutPath, resources, new HelixAdminFactory(), + wagedConfigFilePath, maxInstancesInOneResourceForFullAuto, options.has(setPreferenceList), + options.has(dryRun)); + } else if (options.has(updatePreferenceListsOpt)) { + List expectedOpts = + Arrays.asList(updatePreferenceListsOpt, clusterNameOpt, zkLayoutPathOpt, partitionsOpt, resourceNameOpt, + dcsNameOpt); + ToolUtils.ensureOrExit(expectedOpts, options, parser); + HelixBootstrapUpgradeUtil.updatePreferenceLists(zkLayoutPath, clusterName, dcs, resourceName, partitions); + } else if (options.has(rollbackToSemiAutoOpt)) { + List expectedOpts = Arrays.asList(clusterNameOpt, dcsNameOpt, zkLayoutPathOpt, resourceNamesOpt); + ToolUtils.ensureOrExit(expectedOpts, options, parser); + HelixBootstrapUpgradeUtil.rollbackToSemiAuto(clusterName, dcs, zkLayoutPath, resources, new HelixAdminFactory()); } else if (adminConfigStr != null) { listOpt.add(adminConfigsOpt); ToolUtils.ensureOrExit(listOpt, options, parser); @@ -319,14 +369,6 @@ public static void main(String[] args) throws Exception { HelixBootstrapUpgradeUtil.migrateToPropertyStore(hardwareLayoutPath, partitionLayoutPath, zkLayoutPath, clusterNamePrefix, dcs); break; - case MigrateToFullAuto: - listOpt.add(wagedConfigFilePathOpt); - listOpt.add(maxInstancesInOneResourceForFullAutoOpt); - ToolUtils.ensureOrExit(listOpt, options, parser); - HelixBootstrapUpgradeUtil.migrateToFullAuto(partitionLayoutPath, zkLayoutPath, clusterNamePrefix, dcs, - resources, options.has(dryRun), wagedConfigFilePath, hardwareLayoutPath, - maxInstancesInOneResourceForFullAuto); - break; case ResetPartition: case EnablePartition: HelixBootstrapUpgradeUtil.controlPartitionState(hardwareLayoutPath, partitionLayoutPath, zkLayoutPath,